In [0]:
# Read in data from table
df = spark.sql("SELECT * FROM data")

#### Reference variables

In [0]:
# Column names for user objects
user_cols = [
    "id",
    "avatar_url",
    "events_url",
    "followers_url",
    "following_url",
    "gists_url",
    "gravatar_id",
    "html_url",
    "login",
    "node_id",
    "organizations_url",
    "received_events_url",
    "repos_url",
    "site_admin",
    "starred_url",
    "subscriptions_url",
    "type",
    "url",
]

# Top level actor cols (majority missing values)
actor_cols = [lit(None) for i in range(18)]
actor_cols[0] = col("actor_id").cast(LongType())
actor_cols[1] = "actor_avatar_url"
actor_cols[6] = "actor_gravatar_id"
actor_cols[8] = "actor_login"
actor_cols[16] = lit("User").alias("type")
actor_cols[17] = "actor_url"

# Top level org cols (majority missing values)
org_cols = [lit(None) for i in range(18)]
org_cols[0] = col("org_id").cast(LongType())
org_cols[1] = "org_avatar_url"
org_cols[6] = "org_gravatar_id"
org_cols[8] = "org_login"
org_cols[16] = lit("Organization").alias("type")
org_cols[17] = "org_url"

In [0]:
# Paths to where user objects are found
paths = [
    "payload_comment.user",
    "payload_forkee.owner",
    "payload_issue.milestone.creator",
    "payload_issue.assignee",
    "payload_member",
    "payload_pull_request.user",
    "payload_pull_request.milestone.creator",
    "payload_pull_request.assignee",
    "payload_pull_request.auto_merge.enabled_by",
    "payload_pull_request.base.user",
    "payload_pull_request.base.repo.owner",
    "payload_pull_request.head.user",
    "payload_pull_request.head.repo.owner",
    "payload_pull_request.merged_by",
    "payload_review.user",
    "payload_release.author",
    "payload_issue.assignees",
    "payload_pull_request.assignees",
    "payload_release.assets",
]

#### User DataFrames from different object paths

##### Single user objects

In [0]:
# Top level actor
actorDF = df.select(actor_cols).distinct().orderBy("actor_id")

# Top level org
orgDF = (
    df.select(org_cols)
    .filter(col("org_id").isNotNull())
    .distinct()
    .orderBy("org_id")
)

In [0]:
# payload.member
memberDF = (
    df.select(f"{paths[4]}.*")
    .select(user_cols)
    .filter(col(paths[4]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.comment.user
comment_userDF = (
    df.select(f"{paths[0]}.*")
    .select(user_cols)
    .filter(col(paths[0]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.forkee.user
forkee_ownerDF = (
    df.select(f"{paths[1]}.*")
    .select(user_cols)
    .filter(col(paths[1]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.issue.milestone.creator
issue_milestone_creatorDF = (
    df.select(f"{paths[2]}.*")
    .select(user_cols)
    .filter(col(paths[2]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.issue.assignee
issue_assigneeDF = (
    df.select(f"{paths[3]}.*")
    .select(user_cols)
    .filter(col(paths[3]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.user
pull_request_userDF = (
    df.select(f"{paths[5]}.*")
    .select(user_cols)
    .filter(col(paths[5]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.milestone.creator
pull_request_milestone_creatorDF = (
    df.select(f"{paths[6]}.*")
    .select(user_cols)
    .filter(col(paths[6]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.assignee
pull_request_assigneeDF = (
    df.select(f"{paths[7]}.*")
    .select(user_cols)
    .filter(col(paths[7]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.auto_merge.enabled_by
pull_request_auto_merge_enabled_byDF = (
    df.select(f"{paths[8]}.*")
    .select(user_cols)
    .filter(col(paths[8]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.base.user
pull_request_base_userDF = (
    df.select(f"{paths[9]}.*")
    .select(user_cols)
    .filter(col(paths[9]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.base.repo.owner
pull_request_base_repo_ownerDF = (
    df.select(f"{paths[10]}.*")
    .select(user_cols)
    .filter(col(paths[10]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.head.user
pull_request_head_userDF = (
    df.select(f"{paths[11]}.*")
    .select(user_cols)
    .filter(col(paths[11]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.head.repo.owner
pull_request_head_repo_ownerDF = (
    df.select(f"{paths[12]}.*")
    .select(user_cols)
    .filter(col(paths[12]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.merged_by
pull_request_merged_byDF = (
    df.select(f"{paths[13]}.*")
    .select(user_cols)
    .filter(col(paths[13]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.review.user
review_userDF = (
    df.select(f"{paths[14]}.*")
    .select(user_cols)
    .filter(col(paths[14]).isNotNull())
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.release.author
release_authorDF = (
    df.select(f"{paths[15]}.*")
    .select(user_cols)
    .filter(col(paths[15]).isNotNull())
    .distinct()
    .orderBy("id")
)

##### User arrays

In [0]:
# payload.issue.assignees
issue_assigneesDF = (
    df.select(explode(paths[16]))
    .select("col.*")
    .select(user_cols)
    .filter(size(col(paths[16])) > 0)
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.pull_request.assignees
pull_request_assigneesDF = (
    df.select(explode(paths[17]))
    .select("col.*")
    .select(user_cols)
    .filter(size(col(paths[17])) > 0)
    .distinct()
    .orderBy("id")
)

In [0]:
# payload.release.assets[asset.uploader]
payload_release_assets_uploaderDF = (
    df.select(explode(f"{paths[18]}.uploader"))
    .select("col.*")
    .select(user_cols)
    .filter(size(col(paths[18])) > 0)
    .distinct()
    .orderBy("id")
)

#### Union all DFs

In [0]:
userDF = comment_userDF.union(forkee_ownerDF).distinct()
userDF = userDF.union(issue_milestone_creatorDF).distinct()
userDF = userDF.union(issue_assigneeDF).distinct()
userDF = userDF.union(memberDF).distinct()
userDF = userDF.union(pull_request_userDF).distinct()
userDF = userDF.union(pull_request_milestone_creatorDF).distinct()
userDF = userDF.union(pull_request_assigneeDF).distinct()
userDF = userDF.union(pull_request_auto_merge_enabled_byDF).distinct()
userDF = userDF.union(pull_request_base_userDF).distinct()
userDF = userDF.union(pull_request_base_repo_ownerDF).distinct()
userDF = userDF.union(pull_request_head_userDF).distinct()
userDF = userDF.union(pull_request_head_repo_ownerDF).distinct()
userDF = userDF.union(pull_request_merged_byDF).distinct()
userDF = userDF.union(review_userDF).distinct()
userDF = userDF.union(release_authorDF).distinct()
userDF = userDF.union(issue_assigneesDF).distinct()
userDF = userDF.union(pull_request_assigneesDF).distinct()
userDF = userDF.union(payload_release_assets_uploaderDF).distinct()

In [0]:
# Union with top level DFs missing fields
userDF = userDF.union(actorDF)
userDF = userDF.union(orgDF).orderBy("id")

In [0]:
# Max hack: group by ID, take max val of fields to remove null values
userDF = userDF.groupBy("id").agg(
    max(user_cols[1]).alias(user_cols[1]), 
    max(user_cols[2]).alias(user_cols[2]), 
    max(user_cols[3]).alias(user_cols[3]), 
    max(user_cols[4]).alias(user_cols[4]), 
    max(user_cols[5]).alias(user_cols[5]), 
    max(user_cols[6]).alias(user_cols[6]), 
    max(user_cols[7]).alias(user_cols[7]), 
    max(user_cols[8]).alias(user_cols[8]), 
    max(user_cols[9]).alias(user_cols[9]), 
    max(user_cols[10]).alias(user_cols[10]), 
    max(user_cols[11]).alias(user_cols[11]), 
    max(user_cols[12]).alias(user_cols[12]), 
    max(user_cols[13]).alias(user_cols[13]), 
    max(user_cols[14]).alias(user_cols[14]), 
    max(user_cols[15]).alias(user_cols[15]), 
    max(user_cols[16]).alias(user_cols[16]), 
    max(user_cols[17]).alias(user_cols[17])
)

#### Write to database

In [0]:
userDF.repartition(15).write.parquet("abfss://team1-project2@20230821desa.dfs.core.windows.net/SilverLayer/user")