In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession , Row
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, MapType , BooleanType
from bs4 import BeautifulSoup

In [3]:
import sys , os
!{sys.executable} -m pip install beautifulsoup4



In [4]:
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [5]:
os.environ["HADOOP_HOME"] = "C:/Spark/spark-3.5.5-bin-hadoop3"  
os.environ["HADOOP_OPTS"] = "-Djava.library.path=C:/Spark/spark-3.5.5-bin-hadoop3/bin"

In [6]:
try:
    spark.stop()
except:
    pass

spark = SparkSession.builder.appName("SilverTransformations")\
    .config("spark.executor.memory", "4g")\
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.16.0,org.apache.parquet:parquet-hadoop:1.15.1")\
    .config("spark.pyspark.python", sys.executable) \
    .config("spark.pyspark.driver.python", sys.executable) \
    .config("spark.hadoop.io.native.lib.available", "false")\
    .getOrCreate()

In [7]:
input_path = "Dataset"
output_path = "SilverDataSet"

In [None]:
comments_schema = StructType([
    StructField("_Id", IntegerType(), False),
    StructField("_PostId", IntegerType(), True),
    StructField("_UserId", IntegerType(), True),
    StructField("_UserDisplayName", StringType(), True),
    StructField("_Score", IntegerType(), True),
    StructField("_Text", StringType(), True),
    StructField("_CreationDate", TimestampType(), True),
    StructField("_ContentLicense", StringType(), True)
])

votes_schema=StructType([
    StructField("Id", IntegerType(), False),
    StructField("PostId",IntegerType(), True),
    StructField("VoteTypeId", IntegerType(), True),
    StructField("CreationDate", TimestampType(), True),
    StructField("UserId",IntegerType(), True),
    StructField("BountyAmount", IntegerType(), True),
])

Badges_schema = StructType([
    StructField("_Class", IntegerType(), True),
    StructField("_Date", TimestampType(), True),  
    StructField("_Id", IntegerType(), False),     
    StructField("_Name", StringType(), True),
    StructField("_TagBased", BooleanType(), True),
    StructField("_UserId", IntegerType(), True)  
])
posts_schema=StructType([
    StructField("AcceptedAnswerId", IntegerType(), True),
    StructField("AnswerCount",IntegerType(), True),
    StructField("Body", StringType(), True),
    StructField("ClosedDate", TimestampType(), True),
    StructField("CommentCount",IntegerType(), True),
    StructField("CommunityOwnedDate", TimestampType(), True),
    StructField("ContentLicense", StringType(), True),
    StructField("CreationDate", TimestampType(), True),
    StructField("FavoriteCount",IntegerType(), True),
    StructField("Id",IntegerType(), False),
    StructField("LastActivityDate", TimestampType(), True),
    StructField("LastEditDate", TimestampType(), True),
    StructField("LastEditorDisplayName", StringType(), True),
    StructField("LastEditorUserId",IntegerType(), True),
    StructField("OwnerDisplayName", StringType(), True),
    StructField("OwnerUserId",IntegerType(), True),
    StructField("ParentId",IntegerType(), True),
    StructField("PostTypeId",IntegerType(), True),
    StructField("Score",IntegerType(), True),
    StructField("Tags", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("ViewCount",IntegerType(), True)
])


In [None]:
Comments_df = spark.read.format("xml") \
    .option("rowTag", "row") \
    .schema(comments_schema)\
    .load(f"{input_path}/Comments.xml")

badges_df = spark.read.format("xml") \
    .option("rowTag", "row") \
    .schema(Badges_schema)\
    .load(f"{input_path}/Badges.xml")

votes_df = spark.read.format("xml") \
    .option("rowTag", "row")\
    .option("attributePrefix", "") \
    .schema(votes_schema) \
    .load(f"{input_path}/Votes.xml")
df_posts = spark.read.format("xml") \
    .option("rowTag", "row")\
    .option("attributePrefix", "") \
    .schema(posts_schema) \
    .load(f"{input_path}/Posts.xml")  
Users_df = spark.read.format("xml") \
    .option("rowTag", "row") \
    .load(f"{input_path}/Users.xml")
df_Tags = spark.read.format("xml") \
    .option("rowTag", "row")\
    .option("attributePrefix", "") \
    .load(f"{input_path}/Tags.xml") 


In [19]:
df_Tags.show()

+-----+-------------+---+--------------------+----------+
|Count|ExcerptPostId| Id|             TagName|WikiPostId|
+-----+-------------+---+--------------------+----------+
| 7844|        20258|  1|            bayesian|     20257|
|  978|        62158|  2|               prior|     62157|
|   12|         NULL|  3|         elicitation|      NULL|
|   18|         NULL|  5|         open-source|      NULL|
| 9359|         8046|  6|       distributions|      8045|
|19853|         9066|  9|    machine-learning|      9065|
| 1879|        20490| 10|             dataset|     20489|
|  999|        28276| 11|              sample|     28275|
|  517|        69287| 12|          population|     69286|
|  334|        66319| 15|         measurement|     66318|
|  406|       139243| 16|              scales|    139242|
|  265|        64387| 17|       interpolation|     64386|
|   39|         NULL| 18|       multivariable|      NULL|
| 5195|         9251| 21|               anova|      9250|
|   55|       

# Comments Table

In [None]:
#Comments_df.show()
#Comments_df.printSchema()

**Calculate Nulls**

In [38]:
'''
total_rows = Comments_df.count()

Comments_df.select(
    [(sum(when(col(c).isNull(), 1).otherwise(0)) / total_rows * 100).alias(c) for c in Comments_df.columns]
).show()
'''

'\ntotal_rows = Comments_df.count()\n\nComments_df.select(\n    [(sum(when(col(c).isNull(), 1).otherwise(0)) / total_rows * 100).alias(c) for c in Comments_df.columns]\n).show()\n'

**Renaming Columns**

In [61]:
Comments_df = Comments_df \
    .withColumnRenamed("_ContentLicense", "ContentLicense") \
    .withColumnRenamed("_CreationDate", "CreationDate") \
    .withColumnRenamed("_Id", "Id") \
    .withColumnRenamed("_PostId", "PostId") \
    .withColumnRenamed("_Score", "Score") \
    .withColumnRenamed("_Text", "Text") \
    .withColumnRenamed("_UserDisplayName", "UserDisplayName") \
    .withColumnRenamed("_UserId", "UserId")

**Dropping Columns**

In [62]:
Comments_df = Comments_df.drop('UserDisplayName')

**Formatting Date**

In [63]:
Comments_df = Comments_df.withColumn("CreationDate", to_date(col("CreationDate"),"yyyy-MM-DD"))

**Handling Nulls**

In [64]:
Comments_df = Comments_df.fillna({"UserId": -2})

In [None]:
#Comments_df.printSchema()       ########
#Comments_df.show()      #########

[Stage 1:>                                                          (0 + 1) / 1]

+---+------+------+-----+--------------------+------------+--------------+
| Id|PostId|UserId|Score|                Text|CreationDate|ContentLicense|
+---+------+------+-----+--------------------+------------+--------------+
|  1|     3|    13|    7|Could be a poster...|  2010-07-19|  CC BY-SA 2.5|
|  2|     5|    13|    0|Yes, R is nice- b...|  2010-07-19|  CC BY-SA 2.5|
|  3|     9|    13|    1|Again- why?  How ...|  2010-07-19|  CC BY-SA 2.5|
|  4|     5|    37|   11|It's mature, well...|  2010-07-19|  CC BY-SA 2.5|
|  6|    14|    23|   10|why ask the quest...|  2010-07-19|  CC BY-SA 2.5|
|  7|    18|    36|    1|also the US censu...|  2010-07-19|  CC BY-SA 2.5|
|  9|    16|    78|    1|Andrew Gelman has...|  2010-07-19|  CC BY-SA 2.5|
| 10|    23|    -2|    8|I am not sure I u...|  2010-07-19|  CC BY-SA 2.5|
| 11|    43|     5|    5|There are many R ...|  2010-07-19|  CC BY-SA 2.5|
| 12|    38|    54|    0|That's just an ex...|  2010-07-19|  CC BY-SA 2.5|
| 13|    20|    24|    2|

                                                                                

In [65]:
#Comments_df.write.mode("overwrite").parquet(f"{output_path}/Comments_Silver")   # run only the first time 

In [None]:
path = f"{output_path}/Comments"

# Check if path exists
if os.path.exists(path):
    existing_comments_df = spark.read.parquet(path)
else:
    existing_comments_df = Comments_df.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_comments_df = Comments_df.join(
    existing_comments_df, 
    "Id", 
    "left_anti"
)

# Write with append
new_comments_df.write.mode("append").parquet(path)

                                                                                

# Badges Table

In [None]:
#badges_df.show()        ######

**Calculate Nulls**

In [None]:
#null_perc=badges_df.select([((count(when(col(c).isNull(),c)) / badges_df.count()) * 100).alias(c) for c in badges_df.columns]) ########
#null_perc.show()       #######

**Dropping Columns**

In [57]:
#badges_df.printSchema()

In [58]:
Badge_Disc_Dim=badges_df.drop(*["_TagBased","_Date","_UserId",'_Id'])

In [41]:
#Badge_Disc_Dim.show()

**Renaming Columns Names for Badge Disc Table** 

In [59]:
for col_name in Badge_Disc_Dim.columns:
    Badge_Disc_Dim = Badge_Disc_Dim.withColumnRenamed(col_name, col_name.lstrip("_"))

In [43]:
#Badge_Disc_Dim.printSchema()        ###

**Identfying the distinct badges**

In [48]:
Badge_Disc_Dim.show()

Py4JJavaError: An error occurred while calling o253.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:120)
	at org.apache.spark.SparkContext.$anonfun$newAPIHadoopFile$2(SparkContext.scala:1257)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:806)
	at org.apache.spark.SparkContext.newAPIHadoopFile(SparkContext.scala:1256)
	at com.databricks.spark.xml.util.XmlFile$.withCharset(XmlFile.scala:54)
	at com.databricks.spark.xml.DefaultSource.$anonfun$createRelation$1(DefaultSource.scala:77)
	at com.databricks.spark.xml.XmlRelation.buildScan(XmlRelation.scala:55)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$6(DataSourceStrategy.scala:335)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:362)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:441)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:361)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:335)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [60]:
Badge_Disc_Dim = Badge_Disc_Dim.select('Name','Class').dropDuplicates()

In [18]:
#Badge_Disc_Dim.show()
#Badge_Disc_Dim.count()


**Adding SK**

In [61]:
#window_spec = Window.orderBy("Name")  

#Badge_Disc_Dim = Badge_Disc_Dim.withColumn("Badge_Desc_Id", row_number().over(window_spec))
Badge_Disc_Dim = Badge_Disc_Dim.withColumn("Badge_Desc_Id", monotonically_increasing_id())


**Rearrange coloumns**

In [62]:
Badge_Disc_Dim = Badge_Disc_Dim.select('Badge_Desc_Id','Name','Class')

In [37]:
#Badge_Disc_Dim.groupBy("Class").count().show()          #####


In [63]:
Badge_Disc_Dim = Badge_Disc_Dim.withColumn("Class",
    when(col("Class") == 1, lit("Gold"))
    .when(col("Class") == 2, lit("Silver"))
    .when(col("Class") == 3, lit("Bronze")))

In [18]:
Badge_Disc_Dim.show()   ###

25/04/08 06:11:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 06:11:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 06:11:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 1:>                                                          (0 + 1) / 1]

25/04/08 06:11:49 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1744091952685_0001_01_000003 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 06:11:49.096]Container killed on request. Exit code is 137
[2025-04-08 06:11:49.144]Container exited with a non-zero exit code 137. 
[2025-04-08 06:11:49.159]Killed by external signal
.
25/04/08 06:11:49 ERROR YarnScheduler: Lost executor 2 on worker3: Container from a bad node: container_1744091952685_0001_01_000003 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 06:11:49.096]Container killed on request. Exit code is 137
[2025-04-08 06:11:49.144]Container exited with a non-zero exit code 137. 
[2025-04-08 06:11:49.159]Killed by external signal
.
25/04/08 06:12:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 06:12:00 WARN Wi

                                                                                

+-------------+--------------+------+
|Badge_Desc_Id|          Name| Class|
+-------------+--------------+------+
|            1|      Altruist|Bronze|
|            2|    Analytical|Bronze|
|            3|     Announcer|Bronze|
|            4| Archaeologist|Silver|
|            5|Autobiographer|Bronze|
|            6|    Benefactor|Bronze|
|            7|          Beta|Silver|
|            8|       Booster|Silver|
|            9|        Caucus|Bronze|
|           10|        Census|Silver|
|           11|Citizen Patrol|Bronze|
|           12|    Civic Duty|Silver|
|           13|       Cleanup|Bronze|
|           14|   Commentator|Bronze|
|           15|     Constable|  Gold|
|           16|   Constituent|Silver|
|           17|    Convention|Silver|
|           18|   Copy Editor|  Gold|
|           19|        Critic|Bronze|
|           20|       Curious|Bronze|
+-------------+--------------+------+
only showing top 20 rows

25/04/08 06:12:25 ERROR YarnScheduler: Lost executor 1 on work

In [None]:
path = f"{output_path}/Badge_Disc_Dim_Silver"
# Check if path exists
if os.path.exists(path):
    existing_Badge_Disc_Dim_df = spark.read.parquet(path)
else:
    existing_Badge_Disc_Dim_df = Badge_Disc_Dim.where(lit(False))
    #spark.createDataFrame(spark.sparkContext.emptyRDD(), badge_disc_dim_schema)

# Identify new records not already in the existing data
new_Badge_Disc_Dim_df = Badge_Disc_Dim.join(
    existing_Badge_Disc_Dim_df, 
    "Badge_Desc_Id", 
    "left_anti"
)

# Write with append
new_Badge_Disc_Dim_df.write.mode("append").parquet(path)


[Stage 1:>                                                          (0 + 1) / 1]

25/04/08 08:11:10 ERROR YarnScheduler: Lost executor 1 on worker2: Container from a bad node: container_1744096611110_0002_01_000002 on host: worker2. Exit status: 137. Diagnostics: [2025-04-08 08:11:10.338]Container killed on request. Exit code is 137
[2025-04-08 08:11:10.342]Container exited with a non-zero exit code 137. 
[2025-04-08 08:11:10.343]Killed by external signal
.
25/04/08 08:11:10 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2) (worker2 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container from a bad node: container_1744096611110_0002_01_000002 on host: worker2. Exit status: 137. Diagnostics: [2025-04-08 08:11:10.338]Container killed on request. Exit code is 137
[2025-04-08 08:11:10.342]Container exited with a non-zero exit code 137. 
[2025-04-08 08:11:10.343]Killed by external signal
.
25/04/08 08:11:10 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container f

[Stage 1:>                                                          (0 + 1) / 1]

25/04/08 08:11:23 ERROR YarnScheduler: Lost executor 3 on worker3: Container from a bad node: container_1744096611110_0002_01_000004 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 08:11:23.783]Container killed on request. Exit code is 137
[2025-04-08 08:11:23.784]Container exited with a non-zero exit code 137. 
[2025-04-08 08:11:23.785]Killed by external signal
.
25/04/08 08:11:23 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 3) (worker3 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container from a bad node: container_1744096611110_0002_01_000004 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 08:11:23.783]Container killed on request. Exit code is 137
[2025-04-08 08:11:23.784]Container exited with a non-zero exit code 137. 
[2025-04-08 08:11:23.785]Killed by external signal
.
25/04/08 08:11:23 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 3 for reason Container f

[Stage 1:>                                                          (0 + 1) / 1]

25/04/08 08:11:30 ERROR TransportClient: Failed to send RPC RPC 5537226630131734778 to /172.28.1.5:57486: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
25/04/08 08:11:30 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 4 at RPC address 172.28.1.3:54860, but got no response. Marking as agent lost.
java.io.IOException: Failed to send RPC RPC 5537226630131734778 to /172.28.1.5:57486: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392)
	at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyLi

[Stage 3:>                                                          (0 + 1) / 1]

25/04/08 08:12:23 ERROR TransportClient: Failed to send RPC RPC 5874819089445076357 to /172.28.1.4:39762: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
25/04/08 08:12:23 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=7898194054997286435,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=312 cap=312]]] to /172.28.1.4:39762; closing connection
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
25/04/08 08:12:23 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 1 from block manager BlockManagerId(6, worker2, 39813, None)
java.io.IOException: Failed to send RPC RPC 5874819089445076357 to /172.28.1.4:39762: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.Transpo

[Stage 2:>                                                          (0 + 1) / 1]

25/04/08 08:12:49 ERROR YarnScheduler: Lost executor 5 on worker3: Container from a bad node: container_1744096611110_0002_02_000002 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 08:12:49.738]Container killed on request. Exit code is 137
[2025-04-08 08:12:49.741]Container exited with a non-zero exit code 137. 
[2025-04-08 08:12:49.742]Killed by external signal
.
25/04/08 08:12:49 WARN TaskSetManager: Lost task 0.0 in stage 2.1 (TID 8) (worker3 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container from a bad node: container_1744096611110_0002_02_000002 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 08:12:49.738]Container killed on request. Exit code is 137
[2025-04-08 08:12:49.741]Container exited with a non-zero exit code 137. 
[2025-04-08 08:12:49.742]Killed by external signal
.
25/04/08 08:12:49 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 5 for reason Container f

                                                                                

25/04/08 08:18:27 ERROR YarnScheduler: Lost executor 7 on worker1: Container from a bad node: container_1744096611110_0002_02_000006 on host: worker1. Exit status: 137. Diagnostics: [2025-04-08 08:18:27.060]Container killed on request. Exit code is 137
[2025-04-08 08:18:27.061]Container exited with a non-zero exit code 137. 
[2025-04-08 08:18:27.062]Killed by external signal
.
25/04/08 08:18:27 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 7 for reason Container from a bad node: container_1744096611110_0002_02_000006 on host: worker1. Exit status: 137. Diagnostics: [2025-04-08 08:18:27.060]Container killed on request. Exit code is 137
[2025-04-08 08:18:27.061]Container exited with a non-zero exit code 137. 
[2025-04-08 08:18:27.062]Killed by external signal
.


**Formatting Date**

In [65]:
badges_df = badges_df.withColumn("AssignedDate", to_date("_Date", "yyyy-MM-dd"))

In [66]:
badges_df.printSchema()

root
 |-- _Class: integer (nullable = true)
 |-- _Date: timestamp (nullable = true)
 |-- _Id: integer (nullable = false)
 |-- _Name: string (nullable = true)
 |-- _TagBased: boolean (nullable = true)
 |-- _UserId: integer (nullable = true)
 |-- AssignedDate: date (nullable = true)



In [32]:
#badges_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+------+--------------------+---+-------+---------+-------+------------+
|_Class|               _Date|_Id|  _Name|_TagBased|_UserId|AssignedDate|
+------+--------------------+---+-------+---------+-------+------------+
|     3|2010-07-19 19:39:...|  1|Teacher|    false|      5|  2010-07-19|
|     3|2010-07-19 19:39:...|  2|Teacher|    false|      6|  2010-07-19|
|     3|2010-07-19 19:39:...|  3|Teacher|    false|      8|  2010-07-19|
|     3|2010-07-19 19:39:...|  4|Teacher|    false|     23|  2010-07-19|
|     3|2010-07-19 19:39:...|  5|Teacher|    false|     36|  2010-07-19|
|     3|2010-07-19 19:39:...|  6|Teacher|    false|     37|  2010-07-19|
|     3|2010-07-19 19:39:...|  7|Teacher|    false|     50|  2010-07-19|
|     3|2010-07-19 19:39:...|  8|Teacher|    false|     55|  2010-07-19|
|     3|2010-07-19 19:39:...|  9|Student|    false|      5|  2010-07-19|
|     3|2010-07-19 19:39:...| 10|Student|    false|      8|  2010-07-19|
|     3| 2010-07-19 19:39:08| 11|Student|    false|

                                                                                

**Renaming Columns for Badges Fact Table**

In [68]:
badges_df = badges_df.withColumnRenamed("_Date", "date") \
                    .withColumnRenamed("_Id", "AssingingBadge_BK") \
                    .withColumnRenamed("_UserId", "User_fk") \
                    .withColumnRenamed("_TagBased", "TagBased")\
                    .withColumnRenamed("_Name", "Name")\
                    


In [69]:
badges_df = badges_df.join(
    Badge_Disc_Dim,
    on = "Name",
    how = "left"
)

In [31]:
#badges_df.show()

25/04/08 07:32:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 07:32:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 10:>                 (0 + 1) / 1][Stage 11:>                 (0 + 1) / 1]

25/04/08 07:32:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 07:32:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 11:>                                                         (0 + 1) / 1]

25/04/08 07:33:20 ERROR YarnScheduler: Lost executor 3 on worker3: Container from a bad node: container_1744096611110_0001_01_000005 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 07:33:20.487]Container killed on request. Exit code is 137
[2025-04-08 07:33:20.488]Container exited with a non-zero exit code 137. 
[2025-04-08 07:33:20.489]Killed by external signal
.
25/04/08 07:33:20 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 3 for reason Container from a bad node: container_1744096611110_0001_01_000005 on host: worker3. Exit status: 137. Diagnostics: [2025-04-08 07:33:20.487]Container killed on request. Exit code is 137
[2025-04-08 07:33:20.488]Container exited with a non-zero exit code 137. 
[2025-04-08 07:33:20.489]Killed by external signal
.
25/04/08 07:33:20 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 7) (worker3 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container 

[Stage 11:>                                                         (0 + 1) / 1]

25/04/08 07:33:27 ERROR TransportClient: Failed to send RPC RPC 5772046530023715220 to /172.28.1.3:37200: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
25/04/08 07:33:27 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 4 at RPC address 172.28.1.4:37996, but got no response. Marking as agent lost.
java.io.IOException: Failed to send RPC RPC 5772046530023715220 to /172.28.1.3:37200: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392)
	at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyLi

[Stage 11:>                                                         (0 + 1) / 1]

25/04/08 07:33:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 07:33:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 07:33:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/08 07:33:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

+---------+------+--------------------+-----------------+--------+-------+------------+-------------+------+
|     Name|_Class|                date|AssingingBadge_BK|TagBased|User_fk|AssignedDate|Badge_Desc_Id| Class|
+---------+------+--------------------+-----------------+--------+-------+------------+-------------+------+
|  Student|     3|2010-07-19 19:39:...|                9|   false|      5|  2010-07-19|           77|Bronze|
|  Student|     3|2010-07-19 19:39:...|               10|   false|      8|  2010-07-19|           77|Bronze|
|  Student|     3| 2010-07-19 19:39:08|               11|   false|     13|  2010-07-19|           77|Bronze|
|  Student|     3|2010-07-19 19:39:...|               12|   false|     18|  2010-07-19|           77|Bronze|
|  Student|     3|2010-07-19 19:39:...|               13|   false|     23|  2010-07-19|           77|Bronze|
|  Student|     3|2010-07-19 19:39:...|               14|   false|     24|  2010-07-19|           77|Bronze|
|  Student|     3|2

In [70]:
badges_fact = badges_df.select("AssingingBadge_BK","User_fk","Badge_Desc_Id", "TagBased", "Assigneddate")

In [46]:
#badges_fact.show()

                                                                                

+-----------------+-------+-------------+--------+------------+
|AssingingBadge_BK|User_fk|Badge_Desc_Id|TagBased|Assigneddate|
+-----------------+-------+-------------+--------+------------+
|                9|      5|          425|   false|  2010-07-19|
|               10|      8|          425|   false|  2010-07-19|
|               11|     13|          425|   false|  2010-07-19|
|               12|     18|          425|   false|  2010-07-19|
|               13|     23|          425|   false|  2010-07-19|
|               14|     24|          425|   false|  2010-07-19|
|               16|     59|          425|   false|  2010-07-19|
|               17|     66|          425|   false|  2010-07-19|
|               18|     69|          425|   false|  2010-07-19|
|               19|     75|          425|   false|  2010-07-19|
|                1|      5|          380|   false|  2010-07-19|
|                2|      6|          380|   false|  2010-07-19|
|                3|      8|          380

In [None]:
#badges_fact.write.mode("overwrite").parquet(f"{output_path}/Badges_Fact_Silver")
path = f"{output_path}/Badges_Fact"

# Check if path exists
if os.path.exists(path):
    existing_badges_fact_df = spark.read.parquet(path)
else:
    existing_badges_fact_df = badges_fact.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_badges_fact_df = badges_fact.join(
    existing_badges_fact_df, 
    "AssingingBadge_BK",  
    "left_anti"
)

# Write with append
new_badges_fact_df.write.mode("append").parquet(path)

Py4JJavaError: An error occurred while calling o446.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:120)
	at org.apache.spark.SparkContext.$anonfun$newAPIHadoopFile$2(SparkContext.scala:1257)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:806)
	at org.apache.spark.SparkContext.newAPIHadoopFile(SparkContext.scala:1256)
	at com.databricks.spark.xml.util.XmlFile$.withCharset(XmlFile.scala:54)
	at com.databricks.spark.xml.DefaultSource.$anonfun$createRelation$1(DefaultSource.scala:77)
	at com.databricks.spark.xml.XmlRelation.buildScan(XmlRelation.scala:55)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$6(DataSourceStrategy.scala:335)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:362)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:441)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:361)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:335)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


# Votes Table

**Calculate Nulls**

In [None]:
'''null_perc = votes_df.select(
    (count(when(col("UserId").isNull(), 1)) / count(lit(1)) * 100).alias("UserId_null_percentage"),
    (count(when(col("BountyAmount").isNull(), 1)) / count(lit(1)) * 100).alias("BountyAmount_null_percentage")
).collect()[0]  
null_perc
'''


                                                                                

**Dropping columns**

In [34]:
votes_df = votes_df.drop("UserId", "BountyAmount")

**joining votes with posts**

In [16]:
df_votes_with_owner = votes_df.join(
    df_posts.select("Id", "OwnerUserId"),
    votes_df.PostId == df_posts.Id,
    "left"
)
df_votes_with_owner = df_votes_with_owner.drop(df_posts["Id"])


In [17]:
df_votes_with_owner.show()

[Stage 9:>                                                          (0 + 1) / 1]

+-------+------+----------+-------------------+-----------+
|     Id|PostId|VoteTypeId|       CreationDate|OwnerUserId|
+-------+------+----------+-------------------+-----------+
|     20|     1|         2|2010-07-19 00:00:00|          8|
|      1|     3|         2|2010-07-19 00:00:00|         18|
|      5|     3|         2|2010-07-19 00:00:00|         18|
|     10|     3|         2|2010-07-19 00:00:00|         18|
|     21|     3|         2|2010-07-19 00:00:00|         18|
|      3|     5|         2|2010-07-19 00:00:00|         23|
|      4|     5|         2|2010-07-19 00:00:00|         23|
|     11|     5|         2|2010-07-19 00:00:00|         23|
|     15|     5|         2|2010-07-19 00:00:00|         23|
|     22|     5|         2|2010-07-19 00:00:00|         23|
|     12|     6|         2|2010-07-19 00:00:00|          5|
|     17|     6|         2|2010-07-19 00:00:00|          5|
|     23|     9|         2|2010-07-19 00:00:00|         50|
|     26|    13|         2|2010-07-19 00

                                                                                

**Renaming columns**

In [18]:
df_votes_with_owner = df_votes_with_owner.withColumnRenamed("OwnerUserId", "PostOwnerId")

**Formatting Date**

In [19]:
df_votes_with_owner = df_votes_with_owner.withColumn("CreationDate", date_format(col("CreationDate"), "yyyy-MM-dd"))


In [20]:
df_votes_with_owner.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- PostId: integer (nullable = true)
 |-- VoteTypeId: integer (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- PostOwnerId: integer (nullable = true)



In [21]:
df_votes_final = df_votes_with_owner.withColumn(
    'VoteTypeName',
    when(df_votes_with_owner['VoteTypeId'] == 1, 'AcceptedByOriginator')
    .when(df_votes_with_owner['VoteTypeId'] == 2, 'Upvote')
    .when(df_votes_with_owner['VoteTypeId'] == 3, 'Downvote')
    .when(df_votes_with_owner['VoteTypeId'] == 4, 'Offensive')
    .when(df_votes_with_owner['VoteTypeId'] == 5, 'Favorite')
    .when(df_votes_with_owner['VoteTypeId'] == 6, 'Close')
    .when(df_votes_with_owner['VoteTypeId'] == 7, 'Reopen')
    .when(df_votes_with_owner['VoteTypeId'] == 8, 'BountyStart')
    .when(df_votes_with_owner['VoteTypeId'] == 9, 'BountyClose')
    .when(df_votes_with_owner['VoteTypeId'] == 10, 'Deletion')
    .when(df_votes_with_owner['VoteTypeId'] == 11, 'Undeletion')
    .when(df_votes_with_owner['VoteTypeId'] == 12, 'Migration')
    .otherwise('Unknown') 
)


In [None]:
#df_votes_final.show()   ###



+-------+------+----------+------------+-----------+------------+
|     Id|PostId|VoteTypeId|CreationDate|PostOwnerId|VoteTypeName|
+-------+------+----------+------------+-----------+------------+
|     20|     1|         2|  2010-07-19|          8|      Upvote|
|      1|     3|         2|  2010-07-19|         18|      Upvote|
|      5|     3|         2|  2010-07-19|         18|      Upvote|
|     10|     3|         2|  2010-07-19|         18|      Upvote|
|     21|     3|         2|  2010-07-19|         18|      Upvote|
|      3|     5|         2|  2010-07-19|         23|      Upvote|
|      4|     5|         2|  2010-07-19|         23|      Upvote|
|     11|     5|         2|  2010-07-19|         23|      Upvote|
|     15|     5|         2|  2010-07-19|         23|      Upvote|
|     22|     5|         2|  2010-07-19|         23|      Upvote|
|     12|     6|         2|  2010-07-19|          5|      Upvote|
|     17|     6|         2|  2010-07-19|          5|      Upvote|
|     23| 

                                                                                

In [None]:
#df_votes_final.write.mode("overwrite").parquet(f"{output_path}/Votes")
path = f"{output_path}/Votes_Fact"

# Check if path exists
if os.path.exists(path):
    existing_votes_df = spark.read.parquet(path)
else:
    existing_votes_df = df_votes_final.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_votes_df = df_votes_final.join(
    existing_votes_df, 
    "Id",  # Ensure this is the correct join key
    "left_anti"
)

# Write with append
new_votes_df.write.mode("append").parquet(path)


                                                                                

**Creating Votes types Dim**

In [25]:
vote_types_data = [
    Row(VoteTypeId=1, VoteTypeName='AcceptedByOriginator', Description='Marks an answer as accepted by the question author.'),
    Row(VoteTypeId=2, VoteTypeName='Upvote', Description='Upvote given to a question or answer.'),
    Row(VoteTypeId=3, VoteTypeName='Downvote', Description='Downvote given to a question or answer.'),
    Row(VoteTypeId=4, VoteTypeName='Offensive', Description='Flags a post as offensive or spam.'),
    Row(VoteTypeId=5, VoteTypeName='Favorite', Description='Indicates a user has favorited a question.'),
    Row(VoteTypeId=6, VoteTypeName='Close', Description='Suggests that a question should be closed.'),
    Row(VoteTypeId=7, VoteTypeName='Reopen', Description='Suggests that a closed question should be reopened.'),
    Row(VoteTypeId=8, VoteTypeName='BountyStart', Description='Indicates that a bounty has been added to a question.'),
    Row(VoteTypeId=9, VoteTypeName='BountyClose', Description='Indicates that a bounty has been awarded to an answer.'),
    Row(VoteTypeId=10, VoteTypeName='Deletion', Description='Indicates that a post has been deleted.'),
    Row(VoteTypeId=11, VoteTypeName='Undeletion', Description='Indicates that a post has been undeleted.'),
    Row(VoteTypeId=12, VoteTypeName='Migration', Description='Indicates that a question has been migrated to another Stack Exchange site.')
]

# Create a DataFrame
df_Vote_Types_Dim = spark.createDataFrame(vote_types_data)

Traceback (most recent call last):
  File "/opt/spark/python/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 546, in _dynamic_function_reduce
    state = _function_getstate(func)
            ^^^^^^^^^^^^^^^

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [62]:
df_Vote_Types_Dim.show()

NameError: name 'df_Vote_Types_Dim' is not defined

In [None]:
path = f"{output_path}/Vote_Types_Dim"

# Check if path exists
if os.path.exists(path):
    existing_vote_types_df = spark.read.parquet(path)
else:
    existing_vote_types_df = df_Vote_Types_Dim.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_vote_types_df = df_Vote_Types_Dim.join(
    existing_vote_types_df, 
    "VoteTypeId",  # Ensure this is the correct join key
    "left_anti"
)

# Write with append while maintaining a single output file
new_vote_types_df.write.mode("append").parquet(path)

# Posts

In [18]:
df_posts.printSchema()

root
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- Id: integer (nullable = false)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: integer (nullable = true)



## Filtering posts to Questions and answers 

In [31]:
df_Questions = df_posts.where(col('PostTypeId') == 1)

In [9]:
df_Answers = df_posts.where(col('PostTypeId') == 2)

## Working with Questions 

In [33]:
df_Questions_Filtered_cols = df_Questions.select('Id','OwnerUserId','CreationDate','LastActivityDate','AcceptedAnswerId','Body','Title','Tags','Score','ViewCount','AnswerCount','CommentCount')

#### Dealing with nulls 

In [None]:
#null_counts = df_Questions_Filtered_cols.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_Questions_Filtered_cols.columns])
#null_counts.show()

#### UserID Nulls to -2 and Accepted Answer ID nulls to -1

In [35]:
df_Questions_Handling_Nulls = df_Questions_Filtered_cols.fillna({
    'OwnerUserId':'-2',
    'AcceptedAnswerId': '-1'
})

In [None]:
#null_counts_again = df_Questions_Handling_Nulls.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_Questions_Handling_Nulls.columns])
#null_counts_again.show()

#### Handling Duplication 

In [None]:
#duplicates = df_Questions_Handling_Nulls.groupBy('OwnerUserId','Body').count().filter("count > 1")
#duplicates.show()
#duplicates.count()

In [41]:
df_Questions_dropping_duplicates=df_Questions_Handling_Nulls.dropDuplicates(['OwnerUserId','Body'])

In [None]:
#duplicates_validation = df_Questions_dropping_duplicates.groupBy('OwnerUserId','Body').count().filter("count > 1")
#duplicates_validation.show()

#### Converting Date types 

In [43]:
df_Questions_Date_only =  df_Questions_dropping_duplicates.withColumn("CreationDate", to_date(col("CreationDate"),"yyyy-MM-DD"))
df_Questions_Date_only = df_Questions_Date_only.withColumn("LastActivityDate", to_date(col("LastActivityDate"),"yyyy-MM-DD"))


In [44]:
#df_Questions_Date_only.collect()
#df_Questions_Date_only.limit(10).show(truncate=False)

### Handling Body (HTML to Text )

In [22]:
def html_to_text(html):
    if html is None:
        return None
    # Parse the HTML and extract the text content
    soup = BeautifulSoup(html, "html.parser")
    return soup.get_text()

In [23]:
html_to_text_udf = udf(html_to_text, StringType())


In [47]:
df_Questions_HTMLToText = df_Questions_Date_only.withColumn("Body", html_to_text_udf(col("Body")))


In [48]:
df_Questions_refined_text = df_Questions_HTMLToText.withColumn(
    "Body",
    trim(
        regexp_replace(
            lower(
                regexp_replace("Body", "\\s+", " ")
            ),
            "[^a-zA-Z0-9\\s]",
            ""
        )
    )
)

#### Dealing with Tags

In [49]:
df_Questions_final = df_Questions_refined_text.withColumn("Tags", expr("substring(Tags, 2, length(Tags) - 2)"))

In [50]:
#df_Questions_final = df_Questions_final.withColumn("Tags", split(df_Questions_final["Tags"], "\><"))
df_Questions_final = df_Questions_final.withColumn("Tags", split(df_Questions_final["Tags"], "\\><"))

In [39]:
df_Questions_final.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- OwnerUserId: integer (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- LastActivityDate: date (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Score: integer (nullable = true)
 |-- ViewCount: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- CommentCount: integer (nullable = true)



In [None]:
#df_Questions_final.select(col('Tags')).show(truncate=False)    

[Stage 3:>                                                          (0 + 2) / 6]

In [51]:
df_Questions_final = df_Questions_final.withColumnRenamed("Id","QuestionId")

In [None]:
#df_Questions_final.show()

In [None]:
#df_Questions_final.write.mode("overwrite").parquet(f"{output_path}/Questions_Silver")
path = f"{output_path}/Questions"

# Check if path exists
if os.path.exists(path):
    existing_questions_df = spark.read.parquet(path)
else:
    existing_questions_df = df_Questions_final.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_questions_df = df_Questions_final.join(
    existing_questions_df, 
    "QuestionId",  # Ensure this is the correct join key
    "left_anti"
)

# Write with append
new_questions_df.write.mode("append").parquet(path)


In [None]:
#df_Questions_final.printSchema()

## Working With Answers

In [None]:
#df_Answers.printSchema()

In [None]:
df_Answers_Filtered_cols = df_Answers.select('Id','ParentId','OwnerUserId','CreationDate','LastActivityDate','Body','Score','CommentCount')

In [None]:
#df_Answers_Filtered_cols.show()         

+---+--------+-----------+--------------------+--------------------+--------------------+-----+------------+
| Id|ParentId|OwnerUserId|        CreationDate|    LastActivityDate|                Body|Score|CommentCount|
+---+--------+-----------+--------------------+--------------------+--------------------+-----+------------+
|  5|       3|         23|2010-07-19 19:14:...|2010-07-19 19:21:...|<p>The R-project<...|   90|           3|
|  9|       3|         50|2010-07-19 19:16:...|2010-07-19 19:16:...|<p><a href="http:...|   15|           3|
| 12|       7|          5|2010-07-19 19:18:...|2010-07-19 19:18:...|<p>See my respons...|   24|           1|
| 13|       6|         23|2010-07-19 19:18:...|2010-07-19 19:18:...|<p>Machine Learni...|   27|           6|
| 14|       3|         36|2010-07-19 19:19:...|2010-07-19 19:19:...|<p>I second that ...|    6|           1|
| 15|       1|          6|2010-07-19 19:19:...|2010-07-19 19:19:...|<p>John Cook give...|   24|           0|
| 16|       3|     

                                                                                

### Dealing with nulls

In [None]:
#null_counts = df_Answers_Filtered_cols.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_Answers_Filtered_cols.columns])
#null_counts.show()

In [13]:
df_Answers_Handling_Nulls = df_Answers_Filtered_cols.fillna({
    'OwnerUserId':'-2',
})

In [14]:
#null_counts = df_Answers_Handling_Nulls.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_Answers_Handling_Nulls.columns])
#null_counts.show()

### Handling Duplicates 

In [None]:
#duplicates = df_Answers_Handling_Nulls.groupBy('OwnerUserId','Body','ParentId').count().filter("count > 1")


In [16]:
df_Answers_Handling_Nulls.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- ParentId: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- Body: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- CommentCount: integer (nullable = true)



In [17]:
df_Answers_Date_only =  df_Answers_Handling_Nulls.withColumn("CreationDate", to_date(col("CreationDate"),"yyyy-MM-DD"))
df_Answers_Date_only = df_Answers_Date_only.withColumn("LastActivityDate", to_date(col("LastActivityDate"),"yyyy-MM-DD"))

In [18]:
df_Answers_Date_only.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- ParentId: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- LastActivityDate: date (nullable = true)
 |-- Body: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- CommentCount: integer (nullable = true)



### Handling Body (HTML to Text )

In [24]:
df_Answers_HTMLToText = df_Answers_Date_only.withColumn("Body", html_to_text_udf(col("Body")))


In [25]:
df_Answers_refined_text = df_Answers_HTMLToText.withColumn(
    "Body",
    trim(
        regexp_replace(
            lower(
                regexp_replace("Body", "\\s+", " ")
            ),
            "[^a-zA-Z0-9\\s]",
            ""
        )
    )
)

In [108]:
df_Answers_refined_text.select('Body').show(truncate = False)

25/04/06 19:56:18 WARN TaskSetManager: Lost task 0.0 in stage 100.0 (TID 111) (worker2 executor 4): java.io.IOException: Cannot run program "/home/jupyter/.venv/bin/python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:216)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spa

Py4JJavaError: An error occurred while calling o766.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 100.0 failed 4 times, most recent failure: Lost task 0.3 in stage 100.0 (TID 114) (worker2 executor 4): java.io.IOException: Cannot run program "/home/jupyter/.venv/bin/python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:216)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Cannot run program "/home/jupyter/.venv/bin/python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:216)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 26 more


In [26]:
df_Answers_final = df_Answers_refined_text.withColumnRenamed("Id","AnswerId")


In [None]:
path = f"{output_path}/Answers"

# Check if path exists
if os.path.exists(path):
    existing_answers_df = spark.read.parquet(path)
else:
    existing_answers_df = df_Answers_final.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_answers_df = df_Answers_final.join(
    existing_answers_df, 
    "AnswerId",  
    "left_anti"
)

# Write with append
new_answers_df.write.mode("append").parquet(path)

25/04/07 00:43:43 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 21) (worker3 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo

Py4JJavaError: An error occurred while calling o245.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 24) (worker3 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Cannot run program "/home/jupyter/.venv/bin/python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:216)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338)
	... 9 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
	... 42 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: Cannot run program "/home/jupyter/.venv/bin/python": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:216)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338)
	... 9 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 31 more


# Tags

In [11]:
df_Tags.show()

+----------------+-----------+----+----------+------------+------------------+--------------+------------+-------------+---+----------------+------------+---------------------+----------------+----------------+-----------+--------+----------+-----+----+-----+---------+
|AcceptedAnswerId|AnswerCount|Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|CreationDate|FavoriteCount| Id|LastActivityDate|LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|Tags|Title|ViewCount|
+----------------+-----------+----+----------+------------+------------------+--------------+------------+-------------+---+----------------+------------+---------------------+----------------+----------------+-----------+--------+----------+-----+----+-----+---------+
|            NULL|       NULL|NULL|      NULL|        NULL|              NULL|          NULL|        NULL|         NULL|  1|            NULL|        NULL|                 NULL|            NU

In [10]:
df_tags_with_desc = df_Tags.join(
    df_posts.select("Id", "Body"),  # Include "Id" for the join condition
    df_Tags.ExcerptPostId == df_posts.Id,  # Use df_Tags, not df_votes
    "left"
)



AttributeError: 'DataFrame' object has no attribute 'ExcerptPostId'

In [None]:
#df_tags_with_desc.show()

In [None]:
df_tags_with_desc = df_tags_with_desc.drop(df_posts.Id)
df_tags_with_desc = df_tags_with_desc.withColumnRenamed('Body','TagDesc')
df_tags_with_desc = df_tags_with_desc.withColumnRenamed('ExcerptPostId','TagDescPostId')

In [None]:
#df_tags_with_desc.show(truncate=False)

In [None]:

#df_tags_with_desc.write.mode("overwrite").parquet(f"{output_path}/Tags_Silver")
path = f"{output_path}/Tags"

# Check if path exists
if os.path.exists(path):
    existing_tags_df = spark.read.parquet(path)
else:
    existing_tags_df = df_tags_with_desc.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_tags_df = df_tags_with_desc.join(
    existing_tags_df, 
    "Id", 
    "left_anti"
)

# Write with append
new_tags_df.write.mode("append").parquet(path)



# Users

In [None]:


# calculate nulls in each column

#null_perc=Users_df.select([((count(when(col(c).isNull(),c)) / Users_df.count()) * 100).alias(c) for c in Users_df.columns])
#null_perc.show()


In [None]:

Users_df=Users_df.drop(*["_Location","_AccountId","_AboutMe","_WebsiteURL"])


In [None]:

#Users_df.show()


In [None]:

# Renaming Columns Names
for col_name in Users_df.columns:
    Users_df = Users_df.withColumnRenamed(col_name, col_name.lstrip("_"))

Users_df=Users_df.withColumnRenamed("Id","UsersId_BK")


In [None]:

# check if thier is duplicates
#duplicate_count = Users_df.groupBy(Users_df.columns).count().filter(col("count") > 1)
#duplicate_count.show()


In [None]:

Users_df.printSchema()


In [None]:

# convert to date
Users_df = Users_df.withColumn("CreationDate", to_date(col("CreationDate"), "yyyy-MM-dd")).withColumn("LastAccessDate", to_date(col("LastAccessDate"), "yyyy-MM-dd"))

#convert to Integer
columns_to_convert = ["DownVotes", "UpVotes", "Views","Reputation","UsersId_BK"]
for col_name in columns_to_convert:
    Users_df = Users_df.withColumn(col_name, col(col_name).cast(IntegerType()))


In [None]:

#Users_df.printSchema()


In [None]:

#Users_df.write.mode("overwrite").parquet(f"{output_Path}/Users_Silver")
path = f"{output_Path}/Users"

# Check if path exists
if os.path.exists(path):
    existing_users_df = spark.read.parquet(path)
else:
    existing_users_df = Users_df.where(lit(False))  # Creates an empty DataFrame with the same schema

# Identify new records not already in the existing data
new_users_df = Users_df.join(
    existing_users_df, 
    "UsersId_BK",  # Ensure this is the correct join key
    "left_anti"
)

# Write with append
new_users_df.write.mode("append").parquet(path)


In [20]:
spark.stop()