### df

In [0]:
contname = 'team4-project2' #azure storage account container
storage_acct_name = '20230821desa'

file_pattern = f"abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/BronzeLayer/day_of_the_month=10/"

df = spark.read.format('parquet').load(f'{file_pattern}').withColumnRenamed('id', 'event_id')

###imports

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
from pyspark.sql.functions import cast, col, to_timestamp, when, explode,size
from pyspark.sql import DataFrame
from pyspark.sql.functions import broadcast

###functions

In [0]:
# rename_cols(df.filter(col('payload.issue').isNotNull()), 'issue').display()

def rename_payload_cols(given_df, title):

    # Get the list of all columns under the specified nested column
    nested_columns = given_df.select(f'payload.{title}.*').columns

    # Start with a DataFrame containing the non-nested columns
    new_columns = [f'payload.{title}.{col}' for col in nested_columns]
    return_df = given_df.select('event_id', 'type', *new_columns)

    for column in nested_columns:
        return_df = return_df.withColumnRenamed(column, f'{title}_{column}')

    return return_df


def subDF(given_df, title):
    nested_columns = given_df.select(f'{title}.*').columns
    new_columns = [f'{title}.{col}' for col in nested_columns]
    return_df = given_df.select('event_id', *new_columns)

    for column in nested_columns:
        return_df = return_df.withColumnRenamed(column, f'{title}_{column}')

    return return_df


def subDFWithMasterID(given_df, target_table, id_column):
    nested_columns = given_df.select(f'{target_table}.*').columns
    new_columns = [f'{target_table}.{col}' for col in nested_columns]
    return_df = given_df.select('event_id', f'master_{id_column}_id', *new_columns)

    for column in nested_columns:
        return_df = return_df.withColumnRenamed(column, f'{target_table}_{column}')

    return return_df


def add_index(df, event_type, start_index = 1):
    df_with_index = df.rdd.zipWithIndex().map(lambda x: (x[1]+start_index, *x[0]))

    schema = StructType([
        StructField(f"{event_type}_id", IntegerType(), False),
        *df.schema.fields
    ])

    df_with_index = df_with_index.toDF(schema=schema)

    return df_with_index


def drop_null_columns(df: DataFrame) -> DataFrame:
    null_columns = [col_name for col_name in df.columns if df.filter(col(col_name).isNotNull()).count() == 0]
    df = df.drop(*null_columns)
    return df


def subDFWithMasterID2(given_df, target_table, id_column,*args):
    nested_columns = given_df.select(f'{target_table}.*').columns
    new_columns = [f'{target_table}.{col}' for col in nested_columns]
    return_df = given_df.select('event_id', f'{id_column}', *new_columns,*args)
    for column in nested_columns:
        return_df = return_df.withColumnRenamed(column, f'{target_table}_{column}')
    return (return_df)


In [0]:
adls_container = 'team4-project2' #azure storage account container
adls_account_name = '20230821desa'
client_id = 'de4ff859-02b1-4e2f-9d16-b578fa03df4f' #aka: app id
tenant_id = '33da9f3f-4c1a-4640-8ce1-3f63024aea1d' #aka: directory id

service_credential = dbutils.secrets.get(scope="databricks-app-kv",key="databricks-application")

spark.conf.set(f"fs.azure.account.auth.type.{adls_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{adls_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{adls_account_name}.dfs.core.windows.net", f"{client_id}")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{adls_account_name}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{adls_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

def write_df_to_adls(df, numPartitions, partitionByCol, folder_name , overwrite=False): 
    # Write the DataFrame to ADLS
    df.repartition(1).write.mode("overwrite" if overwrite else "append").parquet(f'abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/SilverLayer/{folder_name}')


###actor_df

In [0]:
# selecting actor columns from df
actor_cols = [col for col in df.columns if col.startswith("actor")]
actor_df = df.select(*actor_cols ,"created_at")

# casting created_at to timestamp
actor_df = actor_df.withColumn('created_at', to_timestamp('created_at'))

# I seleting most updated info per actor_id
new_actor_cols = [col for col in actor_df.columns if col.startswith("actor")]

actor_df = actor_df.groupBy('actor_id').agg({str(new_actor_cols[0]): 'max', str(new_actor_cols[1]): 'max', str(new_actor_cols[2]): 'max', str(new_actor_cols[3]): 'max', str(new_actor_cols[4]): 'max', str(new_actor_cols[5]): 'max', "created_at": "max"}).drop('actor_id', 'max(created_at)')

# renameing max columns
max_actor_cols = [col for col in actor_df.columns if col.startswith("max")]

actor_df = actor_df.withColumnRenamed('max(actor_avatar_url)', 'actor_avatar_url').withColumnRenamed('max(actor_display_login)', 'actor_display_login').withColumnRenamed('max(actor_gravatar_id)', 'actor_gravatar_id').withColumnRenamed('max(actor_url)', 'actor_url').withColumnRenamed('max(actor_id)', 'actor_id').withColumnRenamed('max(actor_login)', 'actor_login')

actor_df = actor_df.select('actor_id', 'actor_login', 'actor_display_login', 'actor_avatar_url', 'actor_url')

write_df_to_adls(actor_df, 7, 'actor_id', 'actors' , True)

### org_df

In [0]:
# selecting org columns from df
org_cols = [col for col in df.columns if col.startswith("org")]
org_df = df.select("event_id", *org_cols ,"created_at").where(df.org_id.isNotNull())

# casting created_at to timestamp
org_df = org_df.withColumn('created_at', to_timestamp('created_at'))

# I seleting most updated info per actor_id
org_df = org_df.groupBy('org_id').agg({str(org_cols[0]): 'max', str(org_cols[1]): 'max', str(org_cols[2]): 'max', str(org_cols[3]): 'max', str(org_cols[4]): 'max'}).drop('org_id', 'max(created_at)')

# renameing max columns
max_org_cols = [col for col in org_df.columns if col.startswith("max")]

org_df = org_df.withColumnRenamed('max(org_avatar_url)', 'org_avatar_url').withColumnRenamed('max(org_gravatar_id)', 'org_gravatar_id').withColumnRenamed('max(org_id)', 'org_id').withColumnRenamed('max(org_login)', 'org_login').withColumnRenamed('max(org_url)', 'org_url')

org_df = org_df.select('org_id', 'org_login', 'org_url', 'org_avatar_url')


write_df_to_adls(org_df, 1, 'org_id', 'orgs' , True)

###repo_df

In [0]:
# selecting org columns from df
repo_cols = [col for col in df.columns if col.startswith("repo")]
repo_df = df.select("event_id", *repo_cols ,"created_at").where(df.repo_id.isNotNull())

# casting created_at to timestamp
repo_df = repo_df.withColumn('created_at', to_timestamp('created_at'))

# I seleting most updated info per actor_id
repo_df = repo_df.groupBy('repo_id').agg({str(repo_cols[0]): 'max', str(repo_cols[1]): 'max', str(repo_cols[2]): 'max', 'event_id': 'max'}).drop('repo_id', 'max(created_at)')

# renameing max columns
max_repo_cols = [col for col in repo_df.columns if col.startswith("max")]

repo_df = repo_df.withColumnRenamed('max(repo_id)', 'repo_id').withColumnRenamed('max(repo_name)', 'repo_name').withColumnRenamed('max(repo_url)', 'repo_url').withColumnRenamed('max(event_id)', 'event_id')

repo_df = repo_df.select('repo_id', 'repo_name', 'repo_url')

write_df_to_adls(repo_df, 1, 'repo_id', 'repo' , True)

In [0]:
df = df.drop('actor_loin', 'actor_avatar_url', 'actor_display_login', 'actor_gravatar_id', 'actor_login', 'actor_url' , 'repo_name', 'repo_url', 'org_avatar_url', 'org_gravatar_id', 'org_login', 'org_url')

### master_issues

In [0]:
master_issues = df.filter(col('payload.issue').isNotNull())
master_issues = rename_payload_cols(master_issues, 'issue')
master_issues = add_index(master_issues, 'master_issue')

### issue_assignees

In [0]:
# issue assignees 

master_issues = master_issues.drop('issue_assignee')

issue_assignees = master_issues.filter(size(col('issue_assignees')) > 0).select('*', explode('issue_assignees').alias('issue_assignee'))

issue_assignees = subDFWithMasterID(issue_assignees, 'issue_assignee', 'issue')

issue_assignees = add_index(issue_assignees, 'user').withColumnRenamed('event_id', 'issue_event_id')

issue_assignees_temp = issue_assignees.drop('master_issue_id')
# Creating null value in the master_users dataframe

master_users = spark.createDataFrame([(0,  None, None, None, None, None, None, None, None,  None, None, None, None, None, None, None, None, None,None, None)], issue_assignees_temp.schema)

master_users_count = master_users.count()

master_users = master_users.union(issue_assignees_temp)

master_users = master_users.drop_duplicates(subset = ['user_id'])


issue_assignees = issue_assignees.withColumnRenamed('user_id', 'assignee_id').drop(*issue_assignees_temp.columns)

# now have issues_assignees table with master event_id from the issues table
# We have dropped all columns expect assinee id created for the master_users table and the master_issue_id

write_df_to_adls(issue_assignees, 1, 'master_issue_id', 'issue_assignees', True)


### issue_users

In [0]:
# Seperates Issues Users tables from the main issues table and joins to the master Users 
# and back to the master_issues table

issue_users = master_issues.filter(col('issue_user').isNotNull())
issue_users = subDF(issue_users, 'issue_user')

issue_users = add_index(issue_users, 'user', master_users_count).withColumnRenamed('event_id', 'issue_event_id')


# Linking the issue_users with the master user's table and addding new values to the count
master_users = master_users.union(issue_users).drop_duplicates(subset = ['issue_assignee_id'])


master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
master_issues = (
            master_issues.join(broadcast(master_users), master_issues.issue_user.id 
            == master_users.issue_assignee_id, how= 'left')
 )

master_issues.cache()

master_issues = master_issues.withColumnRenamed('user_id', 'users_id').drop(*master_users.columns)
master_issues = master_issues.fillna(0, subset = ['users_id'])

master_issues.unpersist()

DataFrame[master_issue_id: int, event_id: string, type: string, issue_active_lock_reason: string, issue_assignees: array<struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gravatar_id:string,html_url:string,id:bigint,login:string,node_id:string,organizations_url:string,received_events_url:string,repos_url:string,site_admin:boolean,starred_url:string,subscriptions_url:string,type:string,url:string>>, issue_author_association: string, issue_body: string, issue_closed_at: string, issue_comments: bigint, issue_comments_url: string, issue_created_at: string, issue_draft: boolean, issue_events_url: string, issue_html_url: string, issue_id: bigint, issue_labels: array<struct<color:string,default:boolean,description:string,id:bigint,name:string,node_id:string,url:string>>, issue_labels_url: string, issue_locked: boolean, issue_milestone: struct<closed_at:string,closed_issues:bigint,created_at:string,creator:struct<avatar_url:string,events_url:

### issue_milestone

In [0]:
# new milestone logic for issues

issue_milestone = master_issues.filter(col('issue_milestone').isNotNull())
# issue_milestone = issue_milestone.withColumn('pull_request_milestone_id', lit(0))
issue_milestone = subDFWithMasterID(issue_milestone, 'issue_milestone', 'issue')

# create milestone's creator tables and links that with milestone table
issue_milestone_creator = issue_milestone.filter(col('issue_milestone_creator').isNotNull())
issue_milestone_creator = subDF(issue_milestone, 'issue_milestone_creator')

issue_milestone_creator = add_index(issue_milestone_creator, 'interm', master_users_count).withColumnRenamed('event_id', 'issue_milestone_event_id')

master_users = master_users.union(issue_milestone_creator).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
issue_milestone = (
            issue_milestone.join(broadcast(master_users), issue_milestone.issue_milestone_creator.id
            == master_users.issue_assignee_id, how= 'left')
)

issue_milestone.cache()

issue_milestone = issue_milestone.withColumnRenamed('user_id', 'creator_id').drop(*master_users.columns,'issue_milestone_creator').fillna(0, subset = ['creator_id'])

issue_milestone.unpersist()

write_df_to_adls(issue_milestone, 1, 'master_issue_id', 'issue_milestone', overwrite= True)


### issue_pull_requests

In [0]:
issue_pull_requests = master_issues.filter(col('issue_pull_request').isNotNull())

issue_pull_requests = subDFWithMasterID(issue_pull_requests, 'issue_pull_request', 'issue')

write_df_to_adls(issue_pull_requests, 10, 'master_issue_id', 'issue_pull_requests', overwrite= True)


### issue_pvga

In [0]:
issue_pvga = master_issues.filter(col('issue_performed_via_github_app').isNotNull())
issue_pvga = subDFWithMasterID(issue_pvga, 'issue_performed_via_github_app', 'issue')

# create pvga's owner tables and links that with pvga table
issue_pvga_owner = issue_pvga.filter(col('issue_performed_via_github_app_owner').isNotNull())
issue_pvga_owner = subDF(issue_pvga, 'issue_performed_via_github_app_owner')

issue_pvga_owner = add_index(issue_pvga_owner, 'interm', master_users_count).withColumnRenamed('event_id', 'issue_pvga_event_id')

master_users = master_users.union(issue_pvga_owner).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

issue_pvga = (
            master_users.join(broadcast(issue_pvga), issue_pvga.issue_performed_via_github_app_owner.id 
            == master_users.issue_assignee_id, how= 'right')
 )

issue_pvga.cache()

issue_pvga = issue_pvga.withColumnRenamed('user_id', 'owner_id').drop(*master_users.columns, 'issue_performed_via_github_app_owner').fillna(0, subset= ['owner_id'])

issue_pvga.unpersist()


# 2 columns in issue PVGA - one is for issue events and the other is for comment events- they can all be diff numbers bc in different columns 

DataFrame[owner_id: int, event_id: string, master_issue_id: int, issue_performed_via_github_app_created_at: string, issue_performed_via_github_app_description: string, issue_performed_via_github_app_events: array<string>, issue_performed_via_github_app_external_url: string, issue_performed_via_github_app_html_url: string, issue_performed_via_github_app_id: bigint, issue_performed_via_github_app_name: string, issue_performed_via_github_app_node_id: string, issue_performed_via_github_app_permissions: struct<actions:string,administration:string,checks:string,codespace_metadata:string,content_references:string,contents:string,deployments:string,discussions:string,emails:string,followers:string,gists:string,issues:string,keys:string,members:string,metadata:string,organization_hooks:string,organization_packages:string,organization_projects:string,organization_user_blocking:string,packages:string,pull_requests:string,pull_requests_comment_only_reviews:string,pull_requests_from_forks:string,re

### issue_pvga_permission

In [0]:
# issue_pvga_permissions = subDF(issue_pvga, 'issue_performed_via_github_app_permissions')

issue_pvga = add_index(issue_pvga, 'master_issue_pvga')

issue_pvga_permissions = issue_pvga.filter(col('issue_performed_via_github_app_permissions').isNotNull())

issue_pvga_permissions = subDFWithMasterID(issue_pvga_permissions, 'issue_performed_via_github_app_permissions', 'issue_pvga')

# # Use these to srater the counts for the next thing ie) pvga in the pull_requests table 

issue_pvga = issue_pvga.drop('issue_performed_via_github_app_permissions')

### issue_pvga_events

In [0]:
# This is the new issues pvga events setup 
issue_pvga_events = issue_pvga.filter(size(col('issue_performed_via_github_app_events')) > 0)
issue_pvga_events = issue_pvga_events.select('*', explode('issue_performed_via_github_app_events').alias('issue_pvga_events'))

issue_pvga_events = issue_pvga_events.select('issue_pvga_events', 'master_issue_pvga_id')

issue_pvga = issue_pvga.drop('issue_performed_via_github_app_events')


write_df_to_adls(issue_pvga, 1, 'master_issue_id', 'issue_pvga', overwrite= True)
write_df_to_adls(issue_pvga_events, 1, 'master_issue_pvga_id', 'issue_pvga_events', overwrite= True)
write_df_to_adls(issue_pvga_permissions, 1, 'master_issue_pvga_id', 'issue_pvga_permissions', overwrite= True)



### issue_labels

In [0]:
issue_labels = master_issues.filter(size(col('issue_labels')) > 0).select('*', explode('issue_labels').alias('issue_label'))

issue_labels = subDFWithMasterID(issue_labels, 'issue_label', 'issue')

write_df_to_adls(issue_labels, 4, 'master_issue_id', 'issue_labels', overwrite= True)


### drop issue cols

In [0]:
master_issues = master_issues.drop('issue_labels', 'issue_pull_request', 'issue_milestone', 'issue_performed_via_github_app', 'issue_reactions', 'issue_assignees', 'issue_user', 'issue_active_lock_reason')




### Comments_master

In [0]:
comments_master = df.filter(col('payload.comment').isNotNull())
comments_master = rename_payload_cols(comments_master, 'comment')
comments_master = add_index(comments_master, 'master_comment')

### comments_user

In [0]:
# Seperates Issues Users tables from the main issues table and joins to the master Users 
# and back to the comments_master table

comment_users = comments_master.filter(col('comment_user').isNotNull())
comment_users = subDF(comment_users, 'comment_user')

comment_users = add_index(comment_users, 'user', master_users_count).withColumnRenamed('event_id', 'issue_event_id')


# Linking the comment_users with the master user's table and addding new values to the count
master_users = master_users.union(comment_users).drop_duplicates(subset = ['issue_assignee_id'])


master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
comments_master = (
            comments_master.join(broadcast(master_users), comments_master.comment_user.id 
            == master_users.issue_assignee_id, how= 'left')
 )

comments_master.cache()

comments_master = comments_master.withColumnRenamed('user_id', 'users_id').drop(*master_users.columns, 'comment_user').fillna(0, subset = ['users_id'])

comments_master.unpersist()


DataFrame[master_comment_id: int, event_id: string, type: string, comment__links: struct<html:struct<href:string>,pull_request:struct<href:string>,self:struct<href:string>>, comment_author_association: string, comment_body: string, comment_commit_id: string, comment_created_at: string, comment_diff_hunk: string, comment_html_url: string, comment_id: bigint, comment_in_reply_to_id: bigint, comment_issue_url: string, comment_line: bigint, comment_node_id: string, comment_original_commit_id: string, comment_original_line: bigint, comment_original_position: bigint, comment_original_start_line: bigint, comment_path: string, comment_performed_via_github_app: struct<created_at:string,description:string,events:array<string>,external_url:string,html_url:string,id:bigint,name:string,node_id:string,owner:struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gravatar_id:string,html_url:string,id:bigint,login:string,node_id:string,organizations_url:st

### comments_pvga

In [0]:
# comments 
comments_pvga = comments_master.filter(col('comment_performed_via_github_app').isNotNull())

comments_pvga = subDFWithMasterID(comments_pvga, 'comment_performed_via_github_app', 'comment')

# create pvga's owner tables and links that with pvga table
comments_pvga_owner = comments_pvga.filter(col('comment_performed_via_github_app_owner').isNotNull())
comments_pvga_owner = subDF(comments_pvga, 'comment_performed_via_github_app_owner')

comments_pvga_owner = add_index(comments_pvga_owner, 'interm', master_users_count).withColumnRenamed('event_id', 'comment_pvga_event_id')

master_users = master_users.union(comments_pvga_owner).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

comments_pvga = (
            master_users.join(broadcast(comments_pvga), comments_pvga.comment_performed_via_github_app_owner.id 
            == master_users.issue_assignee_id, how= 'right')
 )

comments_pvga.cache()

comments_pvga = comments_pvga.withColumnRenamed('user_id', 'owner_id').drop(*master_users.columns,'comment_performed_via_github_app_owner').fillna(0, subset= ['owner_id'])

comments_pvga.unpersist()

write_df_to_adls(comments_pvga, 10, 'master_comment_id', 'comments_pvga', overwrite= True)


### comments_pvga_permission

In [0]:
# doing pvga permissions for the comments table

comments_pvga = add_index(comments_pvga, 'master_comments_pvga')

comments_pvga_permissions = comments_pvga.filter(col('comment_performed_via_github_app_permissions').isNotNull())

comments_pvga_permissions = subDFWithMasterID(comments_pvga_permissions, 'comment_performed_via_github_app_permissions', 'comments_pvga')

# merging two permissions tables and setting up the count on permissions

# Use these to srater the counts for the next thing ie) pvga in the pull_requests table 

comments_pvga = comments_pvga.drop('comment_performed_via_github_app_permissions')

write_df_to_adls(comments_pvga_permissions, 1, 'master_comments_pvga_id', 'comments_pvga_permissions', overwrite= True)


### comments_pvga_events

In [0]:
comments_pvga_events = comments_pvga.filter(size(col('comment_performed_via_github_app_events')) > 0)
comments_pvga_events = comments_pvga_events.select('*', explode('comment_performed_via_github_app_events').alias('comment_pvga_events'))

comments_pvga_events = comments_pvga_events.select('comment_pvga_events', 'master_comments_pvga_id')

comments_pvga = comments_pvga.drop('comment_performed_via_github_app_events')


write_df_to_adls(comments_pvga_events, 1, 'master_comments_pvga_id', 'comments_pvga_events', overwrite= True)


write_df_to_adls(comments_pvga, 1, 'master_comments_pvga_id', 'comments_pvga', overwrite= True)



### comments_links
- creatig new cols out of nested column
- dropping unnesscary columns

In [0]:
comments_master = (
    comments_master.withColumn('comments_link_html', comments_master.comment__links.html.href).withColumn('comments_link_pull_request', comments_master.comment__links.pull_request.href).withColumn('comments_link_self', comments_master.comment__links.self.href).drop('comment__links')
    )

comments_master = comments_master.drop('comment_performed_via_github_app', 'comment_reactions')



### Pull Requests

In [0]:
# pull request assignees and table creation with master_id

master_pull_requests = df.filter(col('payload.pull_request').isNotNull())
master_pull_requests = rename_payload_cols(master_pull_requests, 'pull_request')
master_pull_requests = add_index(master_pull_requests, 'master_pull_request')




### pull_request_assignees

In [0]:
master_pull_requests = master_pull_requests.drop('pull_request_assignee')

pull_request_assignees = master_pull_requests.filter(size(col('pull_request_assignees')) > 0).select('*', explode('pull_request_assignees').alias('pull_request_assignee'))

pull_request_assignees = subDFWithMasterID(pull_request_assignees, 'pull_request_assignee', 'pull_request')

pull_request_assignees = add_index(pull_request_assignees, 'user', master_users_count).withColumnRenamed('user_id', 'user_temp_id')

pull_request_assignees_temp = pull_request_assignees.drop('master_pull_request_id')

# Creating null value in the master_users dataframe

master_users = master_users.union(pull_request_assignees_temp).drop_duplicates(subset = ['issue_assignee_id'])

master_users_count = master_users.count()

# pull_request_assignees = pull_request_assignees.withColumnRenamed('user_id', 'assignee_id').drop(*master_users.columns)

pull_request_assignees = (
            master_users.join(broadcast(pull_request_assignees), pull_request_assignees.pull_request_assignee_id
            == master_users.issue_assignee_id, how= 'right'))

pull_request_assignees.cache()

pull_request_assignees = pull_request_assignees.withColumnRenamed('user_id', 'assignee_id').drop(*master_users.columns, *pull_request_assignees_temp.columns).fillna(0, subset = ['assignee_id'])

pull_request_assignees.unpersist()

# # now have issues_assignees table with master event_id from the issues table
# # We have dropped all columns expect assinee id created for the master_users table and the master_issue_id


write_df_to_adls(pull_request_assignees, 1, 'master_pull_request_id', 'pull_request_assignees', overwrite= True)



### pull_request_users

In [0]:
# pull request users

pull_request_users = master_pull_requests.filter(col('pull_request_user').isNotNull())
pull_request_users = subDF(pull_request_users, 'pull_request_user')

pull_request_users = add_index(pull_request_users, 'user', master_users_count).withColumnRenamed('event_id', 'pr_event_id')

# Linking the pull_request_users with the master user's table and addding new values to the count
master_users = master_users.union(pull_request_users).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
master_pull_request = (
            master_pull_requests.join(master_users, master_pull_requests.pull_request_user.id 
            == master_users.issue_assignee_id, how= 'left')
 )

master_pull_request.cache()

master_pull_request = master_pull_request.withColumnRenamed('user_id', 'users_id').drop(*master_users.columns,'pull_request_user').fillna(0, subset = ['users_id'])

master_pull_request.unpersist()

DataFrame[master_pull_request_id: int, event_id: string, type: string, pull_request__links: struct<comments:struct<href:string>,commits:struct<href:string>,html:struct<href:string>,issue:struct<href:string>,review_comment:struct<href:string>,review_comments:struct<href:string>,self:struct<href:string>,statuses:struct<href:string>>, pull_request_active_lock_reason: string, pull_request_additions: bigint, pull_request_assignees: array<struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gravatar_id:string,html_url:string,id:bigint,login:string,node_id:string,organizations_url:string,received_events_url:string,repos_url:string,site_admin:boolean,starred_url:string,subscriptions_url:string,type:string,url:string>>, pull_request_author_association: string, pull_request_auto_merge: struct<commit_message:string,commit_title:string,enabled_by:struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gr


### pull_request_merged_by
- master_user table logic

In [0]:
# pull request merged by logic 

pull_request_merged_by = master_pull_requests.filter(col('pull_request_merged_by').isNotNull())
pull_request_merged_by = subDF(pull_request_merged_by, 'pull_request_merged_by')

pull_request_merged_by = add_index(pull_request_merged_by, 'user', master_users_count).withColumnRenamed('event_id', 'pr_event_id')

# Linking the pull_request_merged_by with the master user's table and addding new values to the count
master_users = master_users.union(pull_request_merged_by).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
master_pull_requests = (
            master_pull_requests.join(master_users, master_pull_requests.pull_request_merged_by.id 
            == master_users.issue_assignee_id, how= 'left')
 )

master_pull_requests.cache()
master_pull_requests = master_pull_requests.withColumnRenamed('user_id', 'merged_by_id').drop(*master_users.columns,'pull_request_merged_by').fillna(0, subset = ['merged_by_id'])
master_pull_requests.unpersist()


DataFrame[master_pull_request_id: int, event_id: string, type: string, pull_request__links: struct<comments:struct<href:string>,commits:struct<href:string>,html:struct<href:string>,issue:struct<href:string>,review_comment:struct<href:string>,review_comments:struct<href:string>,self:struct<href:string>,statuses:struct<href:string>>, pull_request_active_lock_reason: string, pull_request_additions: bigint, pull_request_assignees: array<struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gravatar_id:string,html_url:string,id:bigint,login:string,node_id:string,organizations_url:string,received_events_url:string,repos_url:string,site_admin:boolean,starred_url:string,subscriptions_url:string,type:string,url:string>>, pull_request_author_association: string, pull_request_auto_merge: struct<commit_message:string,commit_title:string,enabled_by:struct<avatar_url:string,events_url:string,followers_url:string,following_url:string,gists_url:string,gr


### pull_request_milestone

In [0]:
# new milestone logic for issues



pull_request_milestone = master_pull_requests.filter(col('pull_request_milestone').isNotNull())
pull_request_milestone = subDFWithMasterID(pull_request_milestone, 'pull_request_milestone', 'pull_request')
# pull_request_milestone = pull_request_milestone.withColumn('issue_milestone_id', lit(0))

# create milestone's creator tables and links that with milestone table
pull_request_milestone_creator = pull_request_milestone.filter(col('pull_request_milestone_creator').isNotNull())
pull_request_milestone_creator = subDF(pull_request_milestone, 'pull_request_milestone_creator')

pull_request_milestone_creator = add_index(pull_request_milestone_creator, 'interm', master_users_count).withColumnRenamed('event_id', 'pr_milestone_event_id')

master_users = master_users.union(pull_request_milestone_creator).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

pull_request_milestone = (
            master_users.join(broadcast(pull_request_milestone), pull_request_milestone.pull_request_milestone_creator.id 
            == master_users.issue_assignee_id, how= 'right')
 )
pull_request_milestone.cache()

pull_request_milestone = pull_request_milestone.withColumnRenamed('user_id', 'creator_id').drop(*master_users.columns,'pull_request_milestone_creator').fillna(0, subset = ['creator_id'])

pull_request_milestone.unpersist()


write_df_to_adls(pull_request_milestone, 1, 'master_pull_request_id', 'pull_request_milestone', overwrite= True)





### pull_request_labels

In [0]:
# labels logic for pull_request_labels

pull_request_labels = master_pull_requests.filter(size(col('pull_request_labels')) > 0).select('*', explode('pull_request_labels').alias('pull_request_label'))



pull_request_labels = subDFWithMasterID(pull_request_labels, 'pull_request_label', 'pull_request')

write_df_to_adls(pull_request_labels, 10, 'master_pull_request_id', 'pull_request_labels', overwrite= True)




### pull_request_requested_reviewers

In [0]:

# master_pull_requests = master_pull_requests.drop('requested_reviewer')

pull_request_requested_reviewers = master_pull_requests.filter(size(col('pull_request_requested_reviewers')) > 0).select('*', explode('pull_request_requested_reviewers').alias('pull_request_requested_reviewer'))

pull_request_requested_reviewers = subDFWithMasterID(pull_request_requested_reviewers, 'pull_request_requested_reviewer', 'pull_request')


pull_request_requested_reviewers = add_index(pull_request_requested_reviewers, 'user', master_users_count).withColumnRenamed('user_id', 'user_temp_id')

pull_request_requested_reviewers_temp = pull_request_requested_reviewers.drop('master_pull_request_id')

master_users = master_users.union(pull_request_requested_reviewers_temp).drop_duplicates(subset = ['issue_assignee_id'])

master_users_count = master_users.count()

pull_request_requested_reviewers = (
            master_users.join(broadcast(pull_request_requested_reviewers), pull_request_requested_reviewers.pull_request_requested_reviewer_id == master_users.issue_assignee_id, how= 'right')
)

pull_request_requested_reviewers.cache()

pull_request_requested_reviewers = pull_request_requested_reviewers.withColumnRenamed('user_id', 'requested_reviewer_id').drop(*master_users.columns, *pull_request_requested_reviewers_temp.columns).fillna(0, subset = ['requested_reviewer_id'])

pull_request_requested_reviewers.unpersist()

write_df_to_adls(pull_request_requested_reviewers, 4, 'master_pull_request_id', 'pull_request_requested_reviewers', overwrite= True)




### pull_request_links 
- renaming columns from the nested link structure

In [0]:
# dealing with the links 

master_pull_requests = (
    master_pull_requests.withColumn('pull_request_link_comments', col('pull_request__links.comments.href'))
    .withColumn('pull_request_link_commits', col('pull_request__links.commits.href'))
    .withColumn('pull_request_link_html', col('pull_request__links.html.href'))
    .withColumn('pull_request_link_issue', col('pull_request__links.issue.href'))
    .withColumn('pull_request_link_review_comment', col('pull_request__links.review_comment.href'))
    .withColumn('pull_request_link_review', col('pull_request__links.review_comments.href'))
    .withColumn('pull_request_link_self', col('pull_request__links.self.href'))
    .withColumn('pull_request_link_statuses', col('pull_request__links.statuses.href')).drop('pull_request__links')
)


### pull_request_auto_merge

In [0]:
# auto merge 

pull_request_auto_merge = master_pull_requests.filter(col('pull_request_auto_merge').isNotNull())
pull_request_auto_merge = subDFWithMasterID(pull_request_auto_merge, 'pull_request_auto_merge', 'pull_request')
# pull_request_auto_merge = pull_request_auto_merge.withColumn('issue_milestone_id', lit(0))



# create milestone's creator tables and links that with milestone table
pull_request_auto_merge_enabled_by = pull_request_auto_merge.filter(col('pull_request_auto_merge_enabled_by').isNotNull())
pull_request_auto_merge_enabled_by = subDF(pull_request_auto_merge, 'pull_request_auto_merge_enabled_by')

pull_request_auto_merge_enabled_by = add_index(pull_request_auto_merge_enabled_by, 'interm', master_users_count).withColumnRenamed('event_id', 'pr_am_event_id')



master_users = master_users.union(pull_request_auto_merge_enabled_by).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()




pull_request_auto_merge = (
            pull_request_auto_merge.join(broadcast(master_users), pull_request_auto_merge.pull_request_auto_merge_enabled_by.id == master_users.issue_assignee_id, how= 'right')
 )
pull_request_auto_merge.cache()

pull_request_auto_merge= pull_request_auto_merge.withColumnRenamed('user_id', 'enabled_by_id').drop(*master_users.columns,'pull_request_auto_merge_enabled_by').fillna(0, subset = ['enabled_by_id'])

pull_request_auto_merge.unpersist()

write_df_to_adls(pull_request_auto_merge, 10, 'master_pull_request_id', 'pull_request_auto_merge', overwrite= True)





### pull_request_base

In [0]:
pull_request_base = master_pull_requests.filter(col('pull_request_base').isNotNull())
pull_request_base = subDFWithMasterID(pull_request_base, 'pull_request_base', 'pull_request')

pull_request_base = add_index(pull_request_base, 'master_pull_request_base')





### pull_request_base_repo

In [0]:
# create milestone's creator tables and links that with milestone table
pull_request_base_repo = pull_request_base.filter(col('pull_request_base_repo').isNotNull())
pull_request_base_repo = subDFWithMasterID(pull_request_base_repo, 'pull_request_base_repo', 'pull_request_base')
pull_request_base_repo = add_index(pull_request_base_repo, 'master_pull_request_repo_base')

pull_request_base_repo_owner = pull_request_base_repo.filter(col('pull_request_base_repo_owner').isNotNull())
pull_request_base_repo_owner= subDF(pull_request_base_repo, 'pull_request_base_repo_owner')

pull_request_base_repo_owner = add_index(pull_request_base_repo_owner, 'interm', master_users_count).withColumnRenamed('event_id', 'pr_base_repo_event_id')

master_users = master_users.union(pull_request_base_repo_owner).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

pull_request_base_repo = (
            master_users.join(broadcast(pull_request_base_repo), pull_request_base_repo.pull_request_base_repo_owner.id
            == master_users.issue_assignee_id, how= 'right')
 )

pull_request_base_repo.cache()

pull_request_base_repo= pull_request_base_repo.withColumnRenamed('user_id', 'owner_id').drop(*master_users.columns,'pull_request_base_repo_owner').fillna(0, subset = ['owner_id'])

pull_request_base_repo.unpersist()

DataFrame[owner_id: int, master_pull_request_repo_base_id: int, event_id: string, master_pull_request_base_id: int, pull_request_base_repo_allow_forking: boolean, pull_request_base_repo_archive_url: string, pull_request_base_repo_archived: boolean, pull_request_base_repo_assignees_url: string, pull_request_base_repo_blobs_url: string, pull_request_base_repo_branches_url: string, pull_request_base_repo_clone_url: string, pull_request_base_repo_collaborators_url: string, pull_request_base_repo_comments_url: string, pull_request_base_repo_commits_url: string, pull_request_base_repo_compare_url: string, pull_request_base_repo_contents_url: string, pull_request_base_repo_contributors_url: string, pull_request_base_repo_created_at: string, pull_request_base_repo_default_branch: string, pull_request_base_repo_deployments_url: string, pull_request_base_repo_description: string, pull_request_base_repo_disabled: boolean, pull_request_base_repo_downloads_url: string, pull_request_base_repo_events


### pull_request_base_topics

In [0]:
pull_request_base_repo_topics = pull_request_base_repo.filter(size(col('pull_request_base_repo_topics')) > 0)
pull_request_base_repo_topics = pull_request_base_repo_topics.select('*', explode('pull_request_base_repo_topics').alias('pull_request_base_repo_topic'))

pull_request_base_repo_topics = pull_request_base_repo_topics.select('master_pull_request_base_id', 'pull_request_base_repo_topic')

pull_request_base_repo = pull_request_base_repo.drop('pull_request_base_repo_topics')
pull_request_base = pull_request_base_repo.drop('pull_request_base_repo')



### pull_request_base_licesne 
  - pulling nested Licence columns
  - writing pull_request_base tables to the ADLS

In [0]:

pull_request_base_repo = (
    pull_request_base_repo.withColumn('pull_request_base_repo_license_key', col('pull_request_base_repo_license.key')).withColumn('pull_request_base_repo_license_name', col('pull_request_base_repo_license.name')).withColumn('pull_request_base_repo_license_node_id', col('pull_request_base_repo_license.node_id')).withColumn('pull_request_base_repo_license_spdx_id', col('pull_request_base_repo_license.spdx_id')).withColumn('pull_request_base_repo_license_url', col('pull_request_base_repo_license.url')).drop('pull_request_base_repo_license')
)


write_df_to_adls(pull_request_base, 10, 'master_pull_request_id', 'pull_request_base', overwrite= True)
write_df_to_adls(pull_request_base_repo, 10, 'master_pull_request_base_id', 'pull_request_base_repo', overwrite= True)
write_df_to_adls(pull_request_base_repo_topics, 10, 'master_pull_request_repo_base_id', 'pull_request_base_repo_topics', overwrite= True)




### pull_request_head

In [0]:
pull_request_head = master_pull_requests.filter(col('pull_request_head').isNotNull())
pull_request_head = subDFWithMasterID(pull_request_head, 'pull_request_head', 'pull_request')
pull_request_head = add_index(pull_request_head, 'master_pull_request_head')


### pull_request_head_repo

In [0]:
# create milestone's creator tables and links that with milestone table
pull_request_head_repo = pull_request_head.filter(col('pull_request_head_repo').isNotNull())
pull_request_head_repo = subDFWithMasterID(pull_request_head_repo, 'pull_request_head_repo', 'pull_request')
pull_request_head_repo = add_index(pull_request_head_repo, 'master_pull_request_head_repo')

pull_request_head_repo_owner = pull_request_head_repo.filter(col('pull_request_head_repo_owner').isNotNull())
pull_request_head_repo_owner= subDF(pull_request_head_repo, 'pull_request_head_repo_owner')

pull_request_head_repo_owner = add_index(pull_request_head_repo_owner, 'interm', master_users_count).withColumnRenamed('event_id', 'pr_head_repo_event_id')

master_users = master_users.union(pull_request_head_repo_owner).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

pull_request_head_repo = (
            master_users.join(broadcast(pull_request_head_repo), pull_request_head_repo.pull_request_head_repo_owner.id
            == master_users.issue_assignee_id, how= 'right') 
            )

pull_request_head_repo.cache()

pull_request_head_repo = pull_request_head_repo.withColumnRenamed('user_id', 'owner_id').drop(*master_users.columns,'pull_request_head_repo_owner').fillna(0, subset = ['owner_id'])

pull_request_head_repo.unpersist()

DataFrame[owner_id: int, master_pull_request_head_repo_id: int, event_id: string, master_pull_request_id: int, pull_request_head_repo_allow_forking: boolean, pull_request_head_repo_archive_url: string, pull_request_head_repo_archived: boolean, pull_request_head_repo_assignees_url: string, pull_request_head_repo_blobs_url: string, pull_request_head_repo_branches_url: string, pull_request_head_repo_clone_url: string, pull_request_head_repo_collaborators_url: string, pull_request_head_repo_comments_url: string, pull_request_head_repo_commits_url: string, pull_request_head_repo_compare_url: string, pull_request_head_repo_contents_url: string, pull_request_head_repo_contributors_url: string, pull_request_head_repo_created_at: string, pull_request_head_repo_default_branch: string, pull_request_head_repo_deployments_url: string, pull_request_head_repo_description: string, pull_request_head_repo_disabled: boolean, pull_request_head_repo_downloads_url: string, pull_request_head_repo_events_url:


### pull_request_head_repo_topics

In [0]:
pull_request_head_repo_topics = pull_request_head_repo.filter(size(col('pull_request_head_repo_topics')) > 0)
pull_request_head_repo_topics = pull_request_head_repo_topics.select('*', explode('pull_request_head_repo_topics').alias('pull_request_head_repo_topic'))

pull_request_head_repo_topics = pull_request_head_repo_topics.select('master_pull_request_head_repo_id', 'pull_request_head_repo_topic')

pull_request_head_repo = pull_request_head_repo.drop('pull_request_head_repo_topics')
pull_request_head = pull_request_head_repo.drop('pull_request_head_repo')



### pull_request_head_licesne 
  - pulling nested Licence columns
  - writing pull_request_head tables to the ADLS

In [0]:

pull_request_head_repo = (
    pull_request_head_repo.withColumn('pull_request_head_repo_license_key', col('pull_request_head_repo_license.key')).withColumn('pull_request_head_repo_license_name', col('pull_request_head_repo_license.name')).withColumn('pull_request_head_repo_license_node_id', col('pull_request_head_repo_license.node_id')).withColumn('pull_request_head_repo_license_spdx_id', col('pull_request_head_repo_license.spdx_id')).withColumn('pull_request_head_repo_license_url', col('pull_request_head_repo_license.url')).drop('pull_request_head_repo_license')
)



write_df_to_adls(pull_request_head, 10, 'master_pull_request_id', 'pull_request_head', overwrite= True)
write_df_to_adls(pull_request_head_repo, 10, 'master_pull_request_head_id', 'pull_request_head_repo', overwrite= True)
write_df_to_adls(pull_request_head_repo_topics, 10, 'master_pull_request_repo_head_id', 'pull_request_head_repo_topics', overwrite= True)


### pull_request_requested_teams 
- seperating out parent table form the requesed teams

In [0]:
pull_request_requested_teams = master_pull_requests.filter(size(col('pull_request_requested_teams')) > 0)
pull_request_requested_teams = pull_request_requested_teams.select('*', explode('pull_request_requested_teams').alias('pull_request_requested_team'))

pull_request_requested_teams = subDFWithMasterID(pull_request_requested_teams, 'pull_request_requested_team', 'pull_request')

pull_request_requested_teams = add_index(pull_request_requested_teams, 'master_pull_request_requested_teams')

pull_request_requested_teams_parent = pull_request_requested_teams.filter(col('pull_request_requested_team_parent').isNotNull())
# pull_request_requested_teams_parent = subDFWithMasterID(pull_request_requested_teams, 'pull_request_requested_team_parent', 'pull_request')

pull_request_requested_teams_parent= subDFWithMasterID(pull_request_requested_teams, 'pull_request_requested_team_parent', 'pull_request_requested_teams')

pull_request_requested_teams = pull_request_requested_teams.drop('pull_request_requested_team_parent')

write_df_to_adls(pull_request_requested_teams, 10, 'master_pull_request_id', 'pull_request_requested_teams', overwrite= True)
write_df_to_adls(pull_request_requested_teams_parent, 10, 'master_pull_request_requested_teams_id', 'pull_request_requested_teams_parent', overwrite= True)



### dropping pull_request cols

In [0]:

master_pull_requests = master_pull_requests.drop('pull_request_requested_teams', 'pull_request_head', 'pull_request_base', 'pull_request_auto_merge', 'pull_request_requested_reviewers', 'pull_request_labels', 'pull_request_milestone', 'pull_request_assignees', 'pull_request_reactions', 'pull_request__links')


### master_reviews

In [0]:
master_reviews = df.filter(col('payload.review').isNotNull())
master_reviews = rename_payload_cols(master_reviews, 'review')
master_reviews = add_index(master_reviews, 'master_reviews')


### Review users

In [0]:

review_users = master_reviews.filter(col('review_user').isNotNull())
review_users = subDF(review_users, 'review_user')

review_users = add_index(review_users, 'user', master_users_count).withColumnRenamed('event_id', 'review_event_id')

# Linking the review_users with the master user's table and addding new values to the count
master_users = master_users.union(review_users).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
master_reviews = (
            master_users.join(broadcast(master_reviews), master_reviews.review_user.id 
            == master_users.issue_assignee_id, how= 'right')
 )

master_reviews.cache()

master_reviews = master_reviews.withColumnRenamed('user_id', 'users_id').drop(*master_users.columns,'review_user').fillna(0, subset = ['users_id'])

master_reviews.unpersist()



### renaming review links

In [0]:

master_reviews = master_reviews.withColumn('review_link_html', col('review__links.html.href')).withColumn('review_link_pull_request', col('review__links.pull_request.href')).drop('review__links')

master_reviews = master_reviews.drop('review_reactions')



### Commit change event 

In [0]:
master_commit_comment_event = df.filter(col('type') == 'CommitCommentEvent').select('event_id' ,'payload.action')

master_commit_comment_event = (
    master_commit_comment_event.join(comments_master, 'event_id', 'left')
)

master_commit_comment_event.cache()
master_commit_comment_event = master_commit_comment_event.select('event_id', 'action', 'master_comment_id')
master_commit_comment_event.unpersist()

DataFrame[event_id: string, action: string, master_comment_id: int]

In [0]:
master_commit_comment_event = add_index(master_commit_comment_event, 'master_commit_comment_event')

master_commit_comment_event = master_commit_comment_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = (
    df.join(master_commit_comment_event, master_commit_comment_event.master_event_id == df.event_id, "left")
    )

df.cache()

df = df.withColumnRenamed("master_commit_comment_event", 'commit_comment_event_id').drop(*master_commit_comment_event.columns).fillna(0, subset = ['commit_comment_event_id'])

df.unpersist()

write_df_to_adls(master_commit_comment_event, 10, 'master_commit_comment_event_id', 'master_commit_comment_event', overwrite= True)



### issues event table

In [0]:
master_issues_event = df.filter(col('type') == 'IssuesEvent').select('event_id', 'type' ,'payload.action')

master_issues_event = (
    master_issues_event.join(master_issues, 'event_id')
)
master_issues_event.cache()
master_issues_event = master_issues_event.select('event_id', 'action', 'master_issue_id')
master_issues_event.unpersist()


In [0]:
master_issues_event = add_index(master_issues_event, 'master_issues_event')

master_issues_event = master_issues_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = (
    df.join(broadcast(master_issues_event),master_issues_event.master_event_id == df.event_id,"left")
    )

df.cache()

df = df.withColumnRenamed("master_issue_event_id", 'final_issue_event_id').drop(*master_issues_event.columns)
df = df.fillna(0, subset = ['final_issue_event_id'])

df.unpersist()


write_df_to_adls(master_issues_event, 10, 'master_issues_event_id', 'master_issues_event', overwrite= True)


### pull_request_event table

In [0]:
master_pull_requests.printSchema()

In [0]:
master_pull_request_event = df.filter(col('type') == 'PullRequestEvent').select('event_id', 'type' ,'payload.action', 'payload.number')

master_pull_request_event = (
    master_pull_request_event.join(master_pull_requests, 'event_id'))

master_pull_request_event.cache()
master_pull_request_event = master_pull_request_event.select('event_id', 'action', 'number', 'master_pull_request_id')
master_pull_request_event.unpersist()

In [0]:
master_pull_request_event = add_index(master_pull_request_event, 'master_pull_request_event')

master_pull_request_event = master_pull_request_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = (
    df.join(broadcast(master_pull_request_event), master_pull_request_event.master_event_id == df.event_id, "left")
    )
df.cache()

df = df.withColumnRenamed("master_pull_request_event_id", 'pull_request_event_id').drop(*master_pull_request_event.columns).fillna(0, subset = ['pull_request_event_id'])

df.unpersist()

write_df_to_adls(master_pull_request_event, 10, 'master_pull_request_event_id', 'master_pull_request_event', overwrite= True)


### issue_comment

In [0]:
master_issues_comment_event = df.filter(col('type') == 'IssueCommentEvent').select('event_id', 'type' ,'payload.action')

master_issues_comment_event = (
    master_issues_comment_event.join(master_issues, 'event_id')
)
master_issues_comment_event.cache()

master_issues_comment_event = master_issues_comment_event.select('event_id', 'action', 'master_issue_id')

master_issues_comment_event = (
    master_comments.join(broadcast(master_issues_comment_event), 'event_id')
)

master_issues_comment_event = master_issues_comment_event.select('event_id', 'action', 'master_issue_id', 'master_comment_id')
master_issues_comment_event.unpersist()


In [0]:
master_issues_comment_event = add_index(master_issues_comment_event, 'master_issue_comment_event')

master_issues_comment_event = master_issues_comment_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')

# master_issues_comment_event.printSchema()

df = (
    df.join(master_issues_comment_event, master_issues_comment_event.master_event_id == df.event_id, "left")
    )

df.cache()

df = df.withColumnRenamed("master_issue_comment_event_id", 'issue_comment_event_id').drop(*master_issues_comment_event.columns)

df.unpersist()


write_df_to_adls(master_issues_comment_event, 10, 'issue_comment_event_id', 'master_issues_comment_event', overwrite= True)


### pull_request_comments table

In [0]:

master_pull_request_comment_event = df.filter(col('type') == 'PullRequestReviewCommentEvent').select('event_id', 'type' ,'payload.action')

master_pull_request_comment_event = (
    master_pull_request_comment_event.join(master_pull_requests, 'event_id')
)
master_pull_request_comment_event.cache()

master_pull_request_comment_event = master_pull_request_comment_event.select('event_id', 'action', 'master_pull_request_id')

master_pull_request_comment_event = (
    master_pull_request_comment_event.join(comments_master, 'event_id')
)

master_pull_request_comment_event = master_pull_request_comment_event.select('event_id', 'action', 'master_pull_request_id', 'master_comment_id')   

master_pull_request_comment_event.unpersist()


In [0]:
master_pull_request_comment_event = add_index(master_pull_request_comment_event, 'master_pull_request_comment_event')

master_pull_request_comment_event = master_pull_request_comment_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = (
    df.join(master_pull_request_comment_event, master_pull_request_comment_event.master_event_id == df.event_id, "left")
    )

df.cache()

df = df.withColumnRenamed("master_pull_request_comment_event_id", 'pull_request_comment_event_id').drop(*master_pull_request_comment_event.columns).fillna(0, subset = ['pull_request_comment_event_id'])

df.unpersist()

write_df_to_adls(master_pull_request_comment_event, 10, 'master_pull_request_comment_event_id', 'master_pull_request_comment_event', overwrite= True)


### pull_request_review table

In [0]:

master_pull_request_review_event = df.filter(col('type') == 'PullRequestReviewEvent').select('event_id', 'type' ,'payload.action')

master_pull_request_review_event = (
    master_pull_request_review_event.join(master_pull_requests, 'event_id')
)
master_pull_request_review_event.cache()

master_pull_request_review_event = master_pull_request_review_event.select('event_id', 'action', 'master_pull_request_id')

master_pull_request_review_event = (
    master_pull_request_review_event.join(master_reviews, 'event_id')
)

master_pull_request_review_event.select('event_id', 'action', 'master_pull_request_id', 'master_reviews_id')   

master_pull_request_review_event.unpersist()

In [0]:
master_pull_request_review_event = add_index(master_pull_request_review_event, 'master_pull_request_review_event')

master_pull_request_review_event = master_pull_request_review_event.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = (
    df.join(master_pull_request_review_event, master_pull_request_review_event.master_event_id == df.event_id, "left")
    )

df.cache()

df = df.withColumnRenamed("master_pull_request_review_event_id", 'pull_request_review_event_id').drop(*master_pull_request_review_event.columns).fillna(0, subset = ['pull_request_review_event_id'])

df.unpersist()

write_df_to_adls(master_pull_request_review_event, 10, 'master_pull_request_review_event_id', 'master_pull_request_review_event', overwrite= True)



### writing issues, comments, pull_requests and reviews 
- dropping the event_id because linked with our created clustered index

In [0]:
# todo 

master_issues = master_issues.drop('event_id')
comments_master = comments_master.drop('event_id')
master_pull_requests = master_pull_requests.drop('event_id')
master_reviews = master_reviews.drop('event_id')


write_df_to_adls(master_issues, 10, 'master_issues_id', 'master_issues', overwrite= True)
write_df_to_adls(comments_master, 10, 'comment_master_id', 'comments_master', overwrite= True)
write_df_to_adls(master_pull_requests, 10, 'master_pull_requests_id', 'master_pull_requests', overwrite= True)
write_df_to_adls(master_reviews, 10, 'master_reviews_id', 'master_reviews', overwrite= True)



# 1) replace milestone logic 
# 2) do comments logic and related tables
# 3) look at head and body tables for pull_request





### Fork Events

In [0]:
fork_events_df = df.where(df.type == 'ForkEvent')
fork_events_df = rename_payload_cols(fork_events_df, 'forkee')
fork_events_df = add_index(fork_events_df, 'master_fork_event')

In [0]:
fork_event_df = fork_events_df.withColumn('forkee_license_key', col('forkee_license.key')).withColumn('forkee_license_name', col('forkee_license.name')).withColumn('forkee_license_node_id', col('forkee_license.node_id')).withColumn('forkee_license_spdx_id', col('forkee_license.spdx_id')).withColumn('forkee_license_url', col('forkee_license.url')).drop('forkee_license')



fork_event_topics = fork_event_df.filter(size(col('forkee_topics')) > 0)
fork_event_topics = fork_event_topics.select('*', explode('forkee_topics').alias('forkee_topic'))


fork_event_topics = fork_event_topics.select('master_fork_event_id', 'forkee_topic')
fork_event_df = fork_event_df.drop('forkee_topics')


write_df_to_adls(fork_event_topics, 10, 'forkee_topic', 'fork_event_topics', True)


In [0]:
# Seperates Issues Users tables from the main issues table and joins to the master Users 
# and back to the fork_events table

forkee_owner = fork_events_df.filter(col('forkee_owner').isNotNull())
forkee_owner = subDF(forkee_owner, 'forkee_owner')

forkee_owner = add_index(forkee_owner, 'user', master_users_count).withColumnRenamed('event_id', 'issue_event_id')


# Linking the forkee_owner with the master user's table and addding new values to the count
master_users = master_users.union(forkee_owner).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()


# linking user_issues with fork_events table and setting null values to 0 
fork_events_df = (
            fork_events_df.join(master_users, fork_events_df.forkee_owner.id 
            == master_users.issue_assignee_id, how= 'left')
 )
 
fork_events_df.cache()

fork_events_df.withColumnRenamed('user_id', 'owner_id').drop(*master_users.columns, 'forkee_owner').fillna(0, subset = ['owner_id'])

fork_events_df.unpersist()



In [0]:
fork_event_df = fork_event_df.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp')


df = df.join(fork_event_df,fork_event_df.master_event_id == df.event_id,"left")

df.cache()

df = df.withColumnRenamed("master_fork_event_id", 'fork_event_id').drop(*fork_event_df.columns).fillna(0, subset = ['fork_event_id'])

df.unpersist()

write_df_to_adls(fork_event_df, 10, 'master_event_id', 'fork_event', True)


### watch_events

In [0]:
watch_events = df.select('event_id', 'payload.action').where(df.type == 'WatchEvent').withColumn('event_id', df.event_id.cast(LongType()))

# add index
watch_events = add_index(watch_events, 'watch_event', 1)

# add watch_events_df empty row
temp_watch_events = spark.createDataFrame([(0,  0, None)], watch_events.schema)

watch_events = temp_watch_events.union(watch_events)

watch_events.printSchema()


In [0]:
watch_events = watch_events.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp').withColumnRenamed('watch_event_id', 'master_watch_event_id')


df = df.join(watch_events,watch_events.master_event_id == df.event_id,"left")

df.cache()

df = df.withColumnRenamed("master_watch_event_id", 'watch_event_id').drop(*watch_events.columns).fillna(0, subset = ['watch_event_id'])

df.unpersist()

write_df_to_adls(watch_events, 10, 'master_event_id', 'watch_events', True)


### Member Events

In [0]:
member_event_df = df.select('event_id', 'payload.action', 'payload.member').where(df.type == 'MemberEvent')
member_event_df = add_index(member_event_df, 'member_event')

member_users = subDF(member_event_df, 'member')

member_users = add_index(member_users, 'user', master_users_count).withColumnRenamed('event_id', 'issue_event_id')

# Linking the issue_users with the master user's table and addding new values to the count
master_users = master_users.union(member_users).drop_duplicates(subset = ['issue_assignee_id'])

master_users_count = master_users.count()


# linking user_issues with master_issues table and setting null values to 0 
member_event_df = (
            member_event_df.join(master_users, member_event_df.member.id 
            == master_users.issue_assignee_id, how= 'left')
)


member_event_df.cache()

member_event_df = member_event_df.withColumnRenamed('user_id', 'members_id').drop(*master_users.columns).drop('member').fillna(0, 'members_id')

member_event_df = member_event_df.withColumnRenamed('event_id', 'master_event_id').withColumnRenamed('type', 'type_temp').withColumnRenamed('member_event_id', 'master_member_event_id')

member_event_df.unpersist()


# df = df.join(watch_events,watch_events.master_event_id == df.event_id,"left").withColumn("watch_event_id", col('master_watch_event_id')).drop(*watch_events.columns).fillna(0, subset = ['watch_event_id'])


df = df.join(member_event_df,member_event_df.master_event_id == df.event_id,"left")

df.cache()

df = df.withColumn("member_event_id", col('master_member_event_id')).drop(*member_event_df.columns).fillna(0, subset = ['member_event_id'])

df.unpersist()

write_df_to_adls(member_event_df, 10, 'master_event_id', 'member_event', True)


# Azim's 

### Create Event

In [0]:
df_ce = df.filter(df.type=="CreateEvent")
df_ce = add_index(df_ce,"create_event")
df_ce = subDFWithMasterID2(df_ce,"payload","create_event_id")
df_ce = drop_null_columns(df_ce)
tempToDrop = df_ce.select("event_id","create_event_id")
df = df.join(tempToDrop,"event_id","left")
df.cache()
df = df.fillna(0,subset="create_event_id")
df.unpersist()
df_ce = df_ce.drop("event_id")

In [0]:
write_df_to_adls(df_ce,10,"create_event_id", 'create_event',True)


In [0]:
#dbutils.notebook.exit("Create Event finished")
print("Create Event finished")

### Delete Event

In [0]:
df_del = df.filter(df.type=="DeleteEvent")
df_del = add_index(df_del,"delete_event")
df_del = drop_null_columns(subDFWithMasterID2(df_del,"payload","delete_event_id"))
tempToDrop = df_del.select("event_id","delete_event_id")
df = df.join(tempToDrop,"event_id","left")
df.cache()
df = df.fillna(0,subset="delete_event_id")
df.unpersist()
df_del = df_del.drop("event_id")

In [0]:
write_df_to_adls(df_del,10,"delete_event_id",'delete_event',True)

In [0]:
#TODO
# Save the df_del to the parquet file. This has details about all the DeleteEvents
#dbutils.notebook.exit("Delete Event finished")
print("Delete Event finished")

### Gollum Event

In [0]:
df_gol = df.filter(df.type=="GollumEvent")
df_gol = add_index(df_gol,"gollum_event")
df_gol = subDFWithMasterID2(df_gol,"payload","gollum_event_id")
df_gol = drop_null_columns(df_gol)
df_pages_gol = df_gol.select("event_id","gollum_event_id",explode("payload_pages").alias("payload_pages"))
df_pages_gol = add_index(df_pages_gol,"gollum_page")
df_pages_gol = subDFWithMasterID2(df_pages_gol,"payload_pages","gollum_page_id","gollum_event_id")
tempToDrop = df_gol.select("event_id","gollum_event_id")
df = df.join(tempToDrop,"event_id","left")
df.cache()
df = df.fillna(0,subset="gollum_event_id")
df.unpersist()
df_pages_gol = df_pages_gol.drop("event_id")
df_gol = df_gol.drop("event_id")

In [0]:
write_df_to_adls(df_pages_gol,10,"gollum_page_id","gollum_page",True)

In [0]:
#TODO
#Save the df df_pages_gol to a parquet file
#dbutils.notebook.exit("Gollum Event finished")
print("Gollum Event finished")

### Release Event

In [0]:
df_release = df.filter(col('type') == 'ReleaseEvent')
df_release = rename_payload_cols(df_release, 'release')
df_release = add_index(df_release, 'master_release')

In [0]:

df_release_assets = df_release.filter(size(col('release_assets')) > 0)
df_release_assets = df_release_assets.select('*', explode('release_assets').alias('release_asset'))

df_release_assets = subDFWithMasterID(df_release_assets, 'release_asset', 'release')

df_release_assets = add_index(df_release_assets, 'release_assets')




In [0]:
release_asset_uploader = df_release_assets.filter(col('release_asset_uploader').isNotNull())
release_asset_uploader = subDF(release_asset_uploader, 'release_asset_uploader')

release_asset_uploader = add_index(release_asset_uploader, 'user', master_users_count).withColumnRenamed('event_id', 'pr_event_id')

# Linking the release_asset_uploader with the master user's table and addding new values to the count
master_users = master_users.union(release_asset_uploader).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()

# linking user_issues with master_issues table and setting null values to 0 
df_release_assets = (
            df_release_assets.join(master_users, df_release_assets.release_asset_uploader.id 
            == master_users.issue_assignee_id, how= 'left')
 )

df_release_assets.cache()

df_release_assets =df_release_assets.withColumnRenamed('user_id', 'release_asset_uploader_id').drop(*master_users.columns,'release_asset_uploader').fillna(0, subset = ['release_asset_uploader_id'])

df_release_assets.unpersist()



In [0]:


df_release_mentions = df_release.filter(size(col('release_mentions')) > 0).select('*', explode('release_mentions').alias('release_mention'))

df_release_mentions = subDFWithMasterID(df_release_mentions, 'release_mention', 'release')



In [0]:
release_author = df_release.filter(col('release_author').isNotNull())
release_author = subDF(release_author, 'release_author')

release_author = add_index(release_author, 'user', master_users_count).withColumnRenamed('event_id', 'pr_event_id')

# Linking the release_author with the master user's table and addding new values to the count
master_users = master_users.union(release_author).drop_duplicates(subset = ['issue_assignee_id'])
master_users_count = master_users.count()
# linking user_issues with master_issues table and setting null values to 0 
df_release = (
            df_release.join(master_users, df_release.release_author.id 
            == master_users.issue_assignee_id, how= 'left'))
df_release.cache()

df_release = df_release.withColumnRenamed('user_id', 'release_author_id').drop(*master_users.columns,'release_author').fillna(0, subset = ['release_author_id'])

df_release.unpersist()

In [0]:
df_release = df_release.drop('release_mentions', 'release_assets', 'release_reactions')

In [0]:
write_df_to_adls(df_release,10,"master_event_id",'release_event',True)
write_df_to_adls(df_release_mentions,10, "master_release_id", 'release_mentions',True)
write_df_to_adls(df_release_assets,10,"master_release_id",'release_assets',True)



### Push Event 

In [0]:
df_push = df.filter(df.type=="PushEvent")
df_push = add_index(df_push,"push_event")
df_push = subDFWithMasterID2(df_push,"payload","push_event_id")
df_push = drop_null_columns(df_push)
df_commits = df_push.select("push_event_id","event_id",explode("payload_commits").alias("commits"))
df_commits = add_index(df_commits,"push_commit")
df_commits = subDFWithMasterID2(df_commits,"commits","push_commit_id","push_event_id")
df_commits = subDFWithMasterID2(df_commits,"commits_author","push_commit_id","commits_distinct","commits_message","commits_sha","commits_url","push_event_id")
df_push = df_push.drop("commits","distinct_size")
temp = df_push.select("push_event_id","event_id")
df = df.join(temp,"event_id","left")
df.cache()
df = df.fillna(0,subset="push_event_id")
df.unpersist()
df_commits.drop("event_id")
df_push.drop("event_id")

In [0]:
write_df_to_adls(df_push,10,"push_event_id","push_event",True)
write_df_to_adls(df_commits,10,"push_commit_id","push_commits",True)

# Public Event

In [0]:
df = df.withColumn("made_public_via_event",when(col("type") == "Public Event", True).otherwise(False))


Saving Main Table

In [0]:


df = df.drop('payload')

write_df_to_adls(df,10,"event_id",'master_events',True)
write_df_to_adls(master_users,10,"user_id",'master_users',True)
