In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, try_to_timestamp, row_number, sum as spark_sum
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .config("spark.driver.memory", "12g") \
    .appName("Apache Jira Issues") \
    .getOrCreate()

spark.catalog.clearCache()

In [2]:
changelog = spark.read.option("maxColumns", 100000).csv("./apache/changelog.csv", header=True, inferSchema=True)

# drop all rows with null 'field'
changelog_cleaned = changelog.dropna(subset=['field'])

# convert 'created' column to timestamp type (invalid formats will become null)
changelog_cleaned = changelog_cleaned.withColumn('created', try_to_timestamp(col('created')))

# drop rows with null 'created' or 'id'
changelog_cleaned = changelog_cleaned.dropna(subset=['created', 'id'])

In [3]:
# initial grouping by id (for later merging/aggregation)
changelog_issues = changelog_cleaned.groupBy("id").count()

# # get the latest change date per id
# window_spec = Window.partitionBy('id').orderBy(col('created').desc())
# latest_created = changelog_cleaned.withColumn('rn', row_number().over(window_spec)) \
# 	.filter(col('rn') == 1) \
# 	.select('id', col('created').alias('last_change_at'))

# # get the earliest change date per id
# window_spec_asc = Window.partitionBy('id').orderBy(col('created').asc())
# earliest_created = changelog_cleaned.withColumn('rn', row_number().over(window_spec_asc)) \
# 	.filter(col('rn') == 1) \
# 	.select('id', col('created').alias('first_change_at'))

# # join the latest and earliest created date to changelog_issues
# changelog_issues = changelog_issues \
# 	.join(latest_created, on='id', how='left') \
# 	.join(earliest_created, on='id', how='left')

In [None]:
from pyspark.sql import DataFrame

def get_extreme_field_value(df: DataFrame, field_name: str, order: str = "desc", select_col: str = "to", alias: str = None):
    filtered = df.filter(df.field == field_name)
    window_spec = Window.partitionBy('key').orderBy(col('created').desc() if order == "desc" else col('created').asc())
    ranked = filtered.withColumn('rn', row_number().over(window_spec))
    selected = ranked.filter(ranked.rn == 1).select('id', select_col)
    if alias:
        selected = selected.withColumnRenamed(select_col, alias)
    return selected

# Latest timespent per issue
timespent_latest = get_extreme_field_value(changelog_cleaned, 'timespent', order="desc", select_col="to", alias="time_spent")

# Latest timeestimate per issue
timeestimate_latest = get_extreme_field_value(changelog_cleaned, 'timeestimate', order="desc", select_col="to", alias="time_estimate")

# Earliest timeestimate per issue
timeestimate_earliest = get_extreme_field_value(changelog_cleaned, 'timeestimate', order="asc", select_col="to", alias="original_estimate")

In [5]:
timespent_latest = timespent_latest.withColumnRenamed('to', 'time_spent')
timeestimate_latest = timeestimate_latest.withColumnRenamed('to', 'time_estimate')
timeestimate_earliest = timeestimate_earliest.withColumnRenamed('to', 'original_estimate')

changelog_issues = changelog_issues \
    .join(timespent_latest, on='id', how='left') \
    .join(timeestimate_latest, on='id', how='left') \
    .join(timeestimate_earliest, on='id', how='left')

In [6]:
from pyspark.sql.functions import sum as spark_sum, when, col

# Count all changes per issue
issue_changes = (
    changelog_cleaned.groupBy("id")
    .count()
    .withColumnRenamed("count", "total_changes")
)

# Helper to count changes for a specific field
def count_field_changes(df, field_name, alias):
    return (
        df.withColumn(alias, when(col("field") == field_name, 1).otherwise(0))
          .groupBy("id")
          .agg(spark_sum(alias).alias(alias))
    )

# Fields of interest and their aliases
fields = [
    ("status", "status_changes"),
    ("priority", "priority_changes"),
    ("assignee", "assignee_changes"),
    ("issuetype", "issuetype_changes"),
    ("resolution", "resolution_changes"),
    ("timeestimate", "timeestimate_changes"),
    ("description", "description_changes"),
    ("Fix Version", "fixversion_changes"),
]

# Count changes for each field
field_change_counts = {
    alias: count_field_changes(changelog_cleaned, field, alias)
    for field, alias in fields
}

# Join all counts to issues_final
for df in [issue_changes, *field_change_counts.values()]:
    changelog_issues = changelog_issues.join(df, on="id", how="left")

# Cast counts to integer and fill nulls with 0
count_columns = [
    "total_changes",
    "status_changes",
    "priority_changes",
    "assignee_changes",
    "issuetype_changes",
    "resolution_changes",
    "timeestimate_changes",
    "description_changes",
    "fixversion_changes",
]

for col_name in count_columns:
    changelog_issues = changelog_issues.withColumn(
        col_name,
        when(col(col_name).isNull(), 0).otherwise(col(col_name).cast("integer"))
    )

In [7]:
# count number of reopened issues
reopen_times = changelog_cleaned.filter(
    (col("field") == "status") & (col("toString") == "Reopened")
).groupBy("id").count().withColumnRenamed("count", "reopen_times")

# Join reopened count to issues_final
changelog_issues = changelog_issues.join(reopen_times, on="id", how="left")

# Fill nulls with 0
changelog_issues = changelog_issues.withColumn(
    "reopen_times",
    when(col("reopen_times").isNull(), 0).otherwise(col("reopen_times").cast("integer"))
)

In [8]:
# count number of authors making changes to each issue
unique_authors = changelog_cleaned.groupBy("id").agg(spark_sum(when(col("author") != "", 1).otherwise(0)).alias("unique_authors"))

# Join authors count to issues_final
changelog_issues = changelog_issues.join(unique_authors, on="id", how="left")

# Fill nulls with 0
changelog_issues = changelog_issues.withColumn(
    "unique_authors",
    when(col("unique_authors").isNull(), 0).otherwise(col("unique_authors").cast("integer"))
)

In [9]:
# # issue lifespan
# changelog_issues = changelog_issues.withColumn(
#     "workspan",
#     (col("last_change_at").cast("long") - col("first_change_at").cast("long")) / 86400  # Convert seconds to days
# )
# # Fill nulls with 0
# changelog_issues = changelog_issues.withColumn(
#     "workspan",
#     when(col("workspan").isNull(), 0).otherwise(col("workspan").cast("double"))
# )

# # note: the unit of workspan is days, not seconds

In [10]:
# # work tempo indicator: mean time between changes
# changelog_issues = changelog_issues.withColumn(
#     "mean_change_interval",
#     when(col("total_changes") > 1, (col("workspan") * 86400) / (col("total_changes") - 1)).otherwise(0)
# )
# # Fill nulls with 0
# changelog_issues = changelog_issues.withColumn(
#     "mean_change_interval",
#     when(col("mean_change_interval").isNull(), 0).otherwise(col("mean_change_interval").cast("double"))
# )

# # note: the unit of mean_change_interval is seconds
# # change to days
# changelog_issues = changelog_issues.withColumn(
#     "mean_change_interval",
#     col("mean_change_interval") / 86400  # Convert seconds to days
# )

In [11]:
# # max inactivity gap: group by issue id and calculate the maximum gap between changes
# from pyspark.sql import functions as F

# # Do not mutate changelog_cleaned, use temporary columns
# changelog_temp = changelog_cleaned.withColumn(
#     "_created_long",
#     col("created").cast("long")  # Convert to long for easier calculations
# )
# window_spec_inactivity = Window.partitionBy("id").orderBy("_created_long")
# changelog_temp = changelog_temp.withColumn(
#     "_prev_created_long",
#     F.lag("_created_long").over(window_spec_inactivity)
# )
# changelog_temp = changelog_temp.withColumn(
#     "_inactivity_gap",
#     when(col("_prev_created_long").isNull(), 0).otherwise(col("_created_long") - col("_prev_created_long"))
# )
# changelog_issues = changelog_issues.join(
#     changelog_temp.groupBy("id").agg(F.max("_inactivity_gap").alias("max_inactivity_gap")),
#     on="id",
#     how="left"
# )
# # Fill nulls with 0
# changelog_issues = changelog_issues.withColumn(
#     "max_inactivity_gap",
#     when(col("max_inactivity_gap").isNull(), 0).otherwise(col("max_inactivity_gap").cast("double"))
# )

# # note: the unit of max_inactivity_gap is seconds
# # change to days
# changelog_issues = changelog_issues.withColumn(
#     "max_inactivity_gap",
#     col("max_inactivity_gap") / 86400  # Convert seconds to days
# )

In [12]:
changelog_issues.printSchema()

root
 |-- id: string (nullable = true)
 |-- count: long (nullable = false)
 |-- time_spent: string (nullable = true)
 |-- time_estimate: string (nullable = true)
 |-- original_estimate: string (nullable = true)
 |-- total_changes: integer (nullable = true)
 |-- status_changes: integer (nullable = true)
 |-- priority_changes: integer (nullable = true)
 |-- assignee_changes: integer (nullable = true)
 |-- issuetype_changes: integer (nullable = true)
 |-- resolution_changes: integer (nullable = true)
 |-- timeestimate_changes: integer (nullable = true)
 |-- description_changes: integer (nullable = true)
 |-- fixversion_changes: integer (nullable = true)
 |-- reopen_times: integer (nullable = true)
 |-- unique_authors: integer (nullable = true)



In [13]:
# drop 'count' column
changelog_issues = changelog_issues.drop("count")

In [15]:
# save to csv

# # Explicitly cast timestamp columns to string to avoid OSError on toPandas
# timestamp_columns = ['last_change_at', 'first_change_at']
# for col_name in timestamp_columns:
#     changelog_issues = changelog_issues.withColumn(
#         col_name,
#         col(col_name).cast("string")
#     )

changelog_issues.toPandas().to_csv("./apache/changelog_issues.csv",index=False,)

In [16]:
spark.catalog.clearCache()
spark.stop()