# Project 2_ Team 3 _ Bronze to Silver Layer

## Table of contents:
--- 
- Read the bronze layer from the container
- Cleaning functions
- Flatten the bronze table
- Explore the Payload column separately
- ERD Tables:
  - Event_Table
  - repo_df
  - actor_df
  - organization_df
  - PushEvent_df
  - ReleaseEvent_df
  - CreateEvent_df
  - PullRequestEvent_df
  - PullRequestReviewEvent_df
  - CommitCommentEvent_df
  - PullRequestReviewCommentEvent_df
  - DeleteEvent_df
  - GollumEvent_df
  - MemberEvent_df
  - IssuesEvent_df
  - ForkEvent_df
  - IssueCommentEvent_df
  - WatchEvent


##  Import the necessary libraries/functions

In [0]:

import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, isnan, isnull, when, count, countDistinct, isnan, isnull, when, row_number, to_date, coalesce, to_timestamp, regexp_replace
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, DateType, FloatType, BooleanType
from pyspark.sql.window import Window



## Configure access to our Azure Storage account container:

In [0]:

contname = 'team3-project2' #azure storage account container
storage_acct_name = '20230821desa'
client_id = 'de4ff859-02b1-4e2f-9d16-b578fa03df4f' #aka: app id
tenant_id = '33da9f3f-4c1a-4640-8ce1-3f63024aea1d' #aka: directory id
service_credential = dbutils.secrets.get(scope="databricks-app-kv",key="databricks-application")


## Configure Spark to access Azure Storage securely using OAuth-based authentication:

In [0]:

spark.conf.set(f"fs.azure.account.auth.type.{storage_acct_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_acct_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_acct_name}.dfs.core.windows.net", f"{client_id}")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_acct_name}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_acct_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

## Read the bronze layer from the container

In [0]:
brz_df = spark.read.parquet(f"abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/BronzeLayer")

## Cleaning functions

In [0]:
# 1- Check for missing values in a dataframe:
#--------------------------------------------
def check_Missing_Val(df):

    missing_values_counts = df.agg(
        *[count(when(isnull(col(c)) | isnan(col(c)), c)).alias(c) for c in df.columns]
    )
    missing_values_counts.show()



# 2- Count the number of duplicate rows in a dataframe:
#-----------------------------------------------------
def count_Duplicates(df): 
    duplicate_rows_count = df.groupBy(df.columns).count().filter(col("count") > 1).count()
    print(f"Number of duplicate rows: {duplicate_rows_count}")



# 3- Check if a column in df has only unique values:
#----------------------------------------------------
def check_Unique(df, colm):
    distinct_count = df.select(colm).distinct().count()
    total_count = df.select(colm).count()
    print(f"There are {total_count-distinct_count} duplicates in {colm}")
    

# show NaNs in columns
def show_NaNs(df, colm):
    df.where(isnan(col(colm))).show() #probably repalce NaNs with "missing", right split the ref column for location


In [0]:
check_Unique(brz_df, "id")

There are 10 duplicates in id


In [0]:
brz_df = brz_df.dropDuplicates(["id"])

In [0]:
# Used to check event types and their count
brz_df.groupBy("type").count().display()

type,count
PullRequestReviewEvent,29841
PushEvent,1303922
GollumEvent,4035
ReleaseEvent,13642
CommitCommentEvent,10727
CreateEvent,261401
PullRequestReviewCommentEvent,9020
IssueCommentEvent,88845
DeleteEvent,72318
IssuesEvent,35029


In [0]:
# Check the number of partitions
# brz_df.rdd.getNumPartitions()

In [0]:
# print the name of the columns
brz_df.columns

['actor', 'created_at', 'id', 'org', 'payload', 'public', 'repo', 'type']

In [0]:
# Rename the id column to event_id to differentiate it from other 'id' columns within the nested columns
brz_df = brz_df.withColumnRenamed("id", "event_id")

In [0]:
# Print the schema of the bronze layer
brz_df.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _links: struct (nullable = true)
 |    |    |    |-- html: struct (nullable = true)
 |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- pull_request: struc

## Flatten the bronze table

In [0]:
# flatten actor column

actor_flattened_df = brz_df.select(
    col("actor.*"),
    col("created_at"),
    col("event_id"),
    col("org"),
    col("payload"),
    col("public"),
    col("repo"),
    col("type")
)

actor_flattened_df = (actor_flattened_df.withColumnRenamed("avatar_url", "actor_avatar_url").withColumnRenamed("gravatar_id", "actor_gravatar_id").withColumnRenamed("id", "actor_id").withColumnRenamed("login", "actor_login").withColumnRenamed("url", "actor_url").withColumnRenamed("display_login", "actor_display_login"))   

# actor_flattened_df.printSchema()

In [0]:
# flatten org column

org_flattened_df = actor_flattened_df.select(
    col("actor_avatar_url"),
    col("actor_display_login"),
    col("actor_gravatar_id"),
    col("actor_id"),
    col("actor_login"),
    col("actor_url"),
    col("created_at"),
    col("event_id"),
    col("org.*"),
    col("payload"),
    col("public"),
    col("repo"),
    col("type"),
)

org_flattened_df = (org_flattened_df.withColumnRenamed("id", "org_id").withColumnRenamed("url", "org_url").withColumnRenamed("avatar_url", "org_avatar_url").withColumnRenamed("gravatar_id", "org_gravatar_id").withColumnRenamed("login", "org_login"))   

# org_flattened_df.printSchema()

In [0]:
# flatten repo column

flattened_df = org_flattened_df.select(
    col("actor_avatar_url"),
    col("actor_display_login"),
    col("actor_gravatar_id"),
    col("actor_id"),
    col("actor_login"),
    col("actor_url"),
    col("created_at"),
    col("event_id"),
    col("org_avatar_url"),
    col("org_gravatar_id"),
    col("org_id"),
    col("org_login"),
    col("org_url"),
    col("payload"),
    col("public"),
    col("repo.*"),
    col("type"),
)

flattened_df = (flattened_df.withColumnRenamed("struct", "repo_struct").withColumnRenamed("id", "repo_id").withColumnRenamed("name", "repo_name").withColumnRenamed("url", "repo_url"))
 

# flattened_df.printSchema()

In [0]:
flattened_df.columns

['actor_avatar_url',
 'actor_display_login',
 'actor_gravatar_id',
 'actor_id',
 'actor_login',
 'actor_url',
 'created_at',
 'event_id',
 'org_avatar_url',
 'org_gravatar_id',
 'org_id',
 'org_login',
 'org_url',
 'payload',
 'public',
 'repo_id',
 'repo_name',
 'repo_url',
 'type']

In [0]:
# Public column in the table can be dropped because it only has one value: True.
# flattened_df.select("public").distinct().collect()

In [0]:
flattened_df = flattened_df.drop("public")

## Explore the Payload column separately

In [0]:
payload_df = flattened_df.select("payload.*")
# payload_df.printSchema()

In [0]:
payload_df.columns

['action',
 'before',
 'comment',
 'commits',
 'description',
 'distinct_size',
 'forkee',
 'head',
 'issue',
 'master_branch',
 'member',
 'number',
 'pages',
 'pull_request',
 'push_id',
 'pusher_type',
 'ref',
 'ref_type',
 'release',
 'review',
 'size']

In [0]:
# Create a DataFrame to display null counts
# null_counts_df = spark.createDataFrame([(col, count) for col, count in zip(payload_df.columns, null_counts)], ["Column", "Null_Count"])
# null_counts_df.show()

# ERD Tables

## Event_Table

In [0]:
Event_Table = flattened_df.select(
col("event_id"),
col("created_at"),
col("actor_id"),
col("org_id"),
col("type"),
col("repo_id")
)

# Event_Table.printSchema()

In [0]:
# Event_Table.limit(10).display()

In [0]:
# check_Missing_Val(Event_Table)   # 

In [0]:
# Event_Table.count()

In [0]:

# fillnas with 0
Event_Table = Event_Table.fillna(0, subset=["org_id"])

In [0]:
# Event_Table.printSchema()

In [0]:
# check_Unique(Event_Table, "event_id")

In [0]:
#Convert the "created_at" column from string type to timestamp:

Event_Table = Event_Table.withColumn("created_at", col("created_at").cast("timestamp"))
# print(Event_Table.schema["created_at"].dataType)

In [0]:
# Event_Table.limit(10).display()

## Repo_df

In [0]:
repo_df = flattened_df.select('repo_id','repo_name','repo_url')
# repo_df.printSchema()

In [0]:
# repo_df.limit(10).display()

In [0]:
repo_df = repo_df.dropDuplicates()

In [0]:
# check_Missing_Val(repo_df)

In [0]:
# check_Unique(repo_df, "repo_id")

## actor_df

In [0]:
actor_df = actor_flattened_df.select(
    col("actor_id").alias("actor_id"),
    col("actor_login").alias("login"),
    col("actor_display_login").alias("display_login"),
    col("actor_gravatar_id").alias("gravatar_id"),
    col("actor_url").alias("url"),
    col("actor_avatar_url").alias("avatar_url")
)

# actor_df.printSchema()

In [0]:
# check_Missing_Val(actor_df)

In [0]:
# check_Unique(actor_df, "actor_id")

In [0]:
actor_df.dropDuplicates()

DataFrame[actor_id: bigint, login: string, display_login: string, gravatar_id: string, url: string, avatar_url: string]

## organization_df

In [0]:
organization_df = org_flattened_df.select(
    col("org_id").alias("org_id"),
    col("org_login").alias("login"),
    col("org_gravatar_id").alias("gravatar_id"),
    col("org_url").alias("url"),
    col("org_avatar_url").alias("avatar_url")
)

# organization_df.printSchema()

In [0]:
 # check_Missing_Val(organization_df)

In [0]:
# check_Unique(organization_df, "org_id")

In [0]:
organization_df.dropDuplicates()

DataFrame[org_id: bigint, login: string, gravatar_id: string, url: string, avatar_url: string]

## PushEvent_df

In [0]:
# One or more commits are pushed to a repository branch or tag.
PushEvent_df = flattened_df.where(col("type")=='PushEvent')

PushEvent_df = PushEvent_df.select("event_id", "payload.push_id", "payload.distinct_size", "payload.ref", explode("payload.commits.message").alias("commits_message"))
PushEvent_df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- push_id: long (nullable = true)
 |-- distinct_size: long (nullable = true)
 |-- ref: string (nullable = true)
 |-- commits_message: string (nullable = true)



In [0]:
check_Missing_Val(PushEvent_df)



+--------+-------+-------------+---+---------------+
|event_id|push_id|distinct_size|ref|commits_message|
+--------+-------+-------------+---+---------------+
|       0|      0|            0|  0|              0|
+--------+-------+-------------+---+---------------+



In [0]:
# show_NaNs(PushEvent_df, "commits_message")

In [0]:
PushEvent_df = PushEvent_df.fillna("missing", subset=["commits_message"])

In [0]:

# Count the number of duplicate rows in PushEvent_df
# count_Duplicates(PushEvent_df)


In [0]:

PushEvent_df = PushEvent_df.dropDuplicates()

In [0]:
# check_Unique(PushEvent_df, "event_id")


In [0]:
# create a primary id column with the range of rows
PushEvent_df = PushEvent_df.withColumn("push_id_PK", row_number().over(Window.orderBy(F.monotonically_increasing_id())))

columns = PushEvent_df.columns
new_columns = ["push_id_PK"] + [col_name for col_name in columns if col_name != "push_id_PK"]
PushEvent_df = PushEvent_df.select(*new_columns)


# PushEvent_df.limit(10).display()

In [0]:
PushEvent_df.limit(10).display()

# noticed that duplicates only have different commit messages and some are in spanish

push_id_PK,event_id,push_id,distinct_size,ref,commits_message
1,19541174443,8732292121,1,refs/heads/feature,Updated file
2,19541174699,8732292316,1,refs/heads/master,Update list of player scores
3,19541174735,8732292363,1,refs/heads/main,Update del 2021-12-31_20:00
4,19541174889,8732292474,1,refs/heads/source,refactor(Core/Misc): fabs() to std::fabs() (#9790) - prefer std functions over C functions
5,19541175484,8732292917,1,refs/heads/master,tizi ios
6,19541175585,8732293007,1,refs/heads/master,tizi ios
7,19541175598,8732293008,1,refs/heads/main,readme line-break fix
8,19541175685,8732293087,1,refs/heads/master,tizi ios
9,19541175706,8732293106,5,refs/heads/master,Fix shell syntax in build_wheel (#11873) Follow up to #11872 The cross repo split makes this a bit of a pain to test. Co-authored-by: hauntsaninja <>
10,19541175706,8732293106,5,refs/heads/master,"Fixes regression with `__module__` and similar non-final `Enum` names, refs #11820 (#11823)"


## ReleaseEvent_df

In [0]:
ReleaseEvent_df = flattened_df.where(col("type")=='ReleaseEvent')
ReleaseEvent_df = ReleaseEvent_df.select("event_id", "payload.action", "payload.release.id") 
                                        
# ReleaseEvent_df.printSchema()

In [0]:
ReleaseEvent_df = ReleaseEvent_df.dropDuplicates(["id"])

In [0]:
ReleaseEvent_df = ReleaseEvent_df.withColumnRenamed("id", "release_id")

In [0]:
# check_Missing_Val(ReleaseEvent_df)

In [0]:
# ReleaseEvent_df.limit(10).display()

In [0]:
# check_Unique(ReleaseEvent_df, "event_id")

## CreateEvent_df

In [0]:
# A Git branch or tag is created
CreateEvent_df = flattened_df.where(col("type")=='CreateEvent')

In [0]:
# CreateEvent_df.count()

In [0]:
CreateEvent_df = CreateEvent_df.select("event_id", "payload.ref", "payload.ref_type", "payload.master_branch")
# CreateEvent_df.printSchema()


In [0]:
# check_Missing_Val(CreateEvent_df)

In [0]:
# count_Duplicates(CreateEvent_df)

In [0]:
CreateEvent_df = CreateEvent_df.fillna("missing", subset=["ref"])

In [0]:
# CreateEvent_df.limit(10).display()

In [0]:
# CreateEvent_df.where(isnan(col("ref"))).show()

In [0]:
# count_Duplicates(CreateEvent_df)

In [0]:
CreateEvent_df = CreateEvent_df.dropDuplicates()

In [0]:
# check_Unique(CreateEvent_df, "event_id")

## PullRequestEvent_df

In [0]:
# Activity related to pull requests (opened, edited, closed, etc.)
PullRequestEvent_df = flattened_df.where(col("type")=='PullRequestEvent')

In [0]:
# PullRequestEvent_df.count()

In [0]:
# PullRequestEvent_df.printSchema()

In [0]:
# PullRequestEvent_df.select("payload.pull_request.*").limit(10).display()

In [0]:
#
PullRequestEvent_df = PullRequestEvent_df.select("event_id", "payload.action", "payload.pull_request.additions", "payload.pull_request.author_association", "payload.pull_request.deletions", "payload.pull_request.changed_files", "payload.pull_request.number", "payload.pull_request.id")
# PullRequestEvent_df.printSchema()

In [0]:
PullRequestEvent_df = PullRequestEvent_df.withColumnRenamed("id", "pull_request_id")

In [0]:
# PullRequestEvent_df.limit(10).display()

In [0]:
# check_Missing_Val(PullRequestEvent_df)

In [0]:
# count_Duplicates(PullRequestEvent_df)

In [0]:
# check_Unique(PullRequestEvent_df, "event_id")

In [0]:
PullRequestEvent_df.dropDuplicates()

DataFrame[event_id: string, action: string, additions: bigint, author_association: string, deletions: bigint, changed_files: bigint, number: bigint, pull_request_id: bigint]

In [0]:
#payload_pullreq_df.display()

In [0]:
#
# PullRequestReviewEvent_df = flattened_df.where(col("type")=='PullRequestReviewEvent')
# PullRequestReviewEvent_df.limit(10).display()


## CommitCommentEvent_df

In [0]:
# A commit comment is created.
CommitCommentEvent_df = flattened_df.where(col("type")=='CommitCommentEvent')

In [0]:
#CommitCommentEvent_df.limit(5).display()

In [0]:
# CommitCommentEvent_df.select("payload.*").limit(10).display()

In [0]:
# CommitCommentEvent_df.printSchema()

In [0]:
CommitCommentEvent_df = CommitCommentEvent_df.select("event_id", "payload.comment.*")


In [0]:
# CommitCommentEvent_df.limit(5).display()

In [0]:
# CommitCommentEvent_df.printSchema()

In [0]:
CommitCommentEvent_df = CommitCommentEvent_df.select(col("event_id"), col("body"), col("commit_id"), col("created_at").alias("commit_created_at"), col("user.id").alias("user_id"))

CommitCommentEvent_df = CommitCommentEvent_df.withColumn("commit_created_at", col("commit_created_at").cast("timestamp"))


In [0]:
# CommitCommentEvent_df.printSchema()

In [0]:
# CommitCommentEvent_df.limit(10).display()

In [0]:

#check_Missing_Val(CommitCommentEvent_df.drop("commit_created_at"))

In [0]:
# further flattening needed on comment

# CommitCommentEvent_df = CommitCommentEvent_df.select("event_id", "payload.action", "payload.comment")
# CommitCommentEvent_df.printSchema()

In [0]:
# check_Unique(CommitCommentEvent_df, "event_id")

In [0]:
CommitCommentEvent_df = CommitCommentEvent_df.withColumn("commit_id_PK", row_number().over(Window.orderBy(F.monotonically_increasing_id())))
# CommitCommentEvent_df = CommitCommentEvent_df.select("commit_id_PK", *CommitCommentEvent_df.columns)
columns = CommitCommentEvent_df.columns
new_columns = ["commit_id_PK"] + [col_name for col_name in columns if col_name != "commit_id_PK"]
CommitCommentEvent_df = CommitCommentEvent_df.select(*new_columns)
# CommitCommentEvent_df.limit(10).display()


In [0]:
# PublicEvent_df.limit(10).display()

In [0]:
# We will not be using the PublicEvent_df since it mostly contains nulls in all columns.

## DeleteEvent_df

In [0]:
# A Git branch or tag is deleted.
DeleteEvent_df = flattened_df.where(col("type")=="DeleteEvent").select("event_id", "payload.*")

In [0]:
# All the columns are null except: event_id, pusher_type, ref, and ref_type. The pusher_type column has one distinct value only: user, so it is not helpful to keep. ref_type has two distinct values: branch, tag. Therefore, only event_id, ref, and ref_type should be kept.
DeleteEvent_df = DeleteEvent_df.select("event_id","ref","ref_type")  
# DeleteEvent_df.limit(10).display()

In [0]:
# Check for missing values in DeleteEvent_df:
# check_Missing_Val(DeleteEvent_df)

In [0]:
# Count the number of duplicate rows in DeleteEvent_df
# count_Duplicates(DeleteEvent_df)

In [0]:
DeleteEvent_df = DeleteEvent_df.dropDuplicates()

In [0]:
# Check if event_id column has only unique values:
# check_Unique(DeleteEvent_df, "event_id")

## GollumEvent_df

In [0]:
# A wiki page is created or updated
GollumEvent_df = flattened_df.where(col("type")=="GollumEvent").select("event_id", "payload.*")
# GollumEvent_df.printSchema() 

In [0]:
# Only event_id and pages columns have value, and pages is a list of page objects, thus, using explode to create a row for each page
GollumEvent_df = GollumEvent_df.select("event_id",explode("pages").alias("page")).select("event_id","page.*")
# GollumEvent_df.limit(10).display()

In [0]:
# The html url's endpoint is the page_name (transitive dependency)

In [0]:
# Check for missing values in GollumEvent_df:
# check_Missing_Val(GollumEvent_df)                # Drop Summary column as it contains mostly nulls.

In [0]:
# Check if sha is unique:
# check_Unique(GollumEvent_df,"sha")

In [0]:
# The summary, sha and html_url columns can be dropped.
GollumEvent_df = GollumEvent_df.select('event_id', 'action', 'page_name', 'title')
# GollumEvent_df.limit(10).display()

In [0]:
# Count the number of duplicate rows in GollumEvent_df
# count_Duplicates(GollumEvent_df)

In [0]:
GollumEvent_df = GollumEvent_df.dropDuplicates()

In [0]:
# Check if event_id column has only unique values:
# check_Unique(GollumEvent_df, "event_id")

In [0]:
# Adding a row number column as primary key, since event_id is not unique
GollumEvent_df = GollumEvent_df.withColumn("gollum_id_PK", row_number().over(Window.orderBy(F.monotonically_increasing_id())))
columns = GollumEvent_df.columns
new_columns = ["gollum_id_PK"] + [col_name for col_name in columns if col_name != "gollum_id_PK"]
GollumEvent_df = GollumEvent_df.select(*new_columns)
# GollumEvent_df.limit(10).display()

## MemberEvent_df

In [0]:
# Activity related to repository collaborators
MemberEvent_df = flattened_df.where(col("type")=="MemberEvent").select("event_id", "payload.*")

In [0]:
# The only columns that have value in them are: event_id, action, and member (which is nested and needs to be expanded):
MemberEvent_df = MemberEvent_df.select("event_id", "action", "member.*")
# MemberEvent_df.limit(10).display()

In [0]:
MemberEvent_df = MemberEvent_df.select("event_id","action", col("id").alias("member_id"), col("login").alias("member_login"),
                                       col("site_admin").alias("member_site_admin"), col("type").alias("member_type"))

In [0]:
MemberEvent_df = MemberEvent_df.dropDuplicates()

In [0]:
# MemberEvent_df.limit(10).display()

In [0]:
# Check distinct action types:
# MemberEvent_df.select("action").distinct().show()

In [0]:
# Check for missing values:
# check_Missing_Val(MemberEvent_df.drop("member_site_admin"))

In [0]:
# Check if event_id is unique:
# check_Unique(MemberEvent_df, "event_id")

## IssuesEvent_df

In [0]:
# Activity related to an issue.
IssuesEvent_df = flattened_df.where(col("type")=="IssuesEvent").select("event_id", "payload.*")

In [0]:
# Non null columns are: event_id, action, and issue (which is nested)
IssuesEvent_df = IssuesEvent_df.select("event_id","action", "issue.*")
# IssuesEvent_df.limit(10).display()

In [0]:
IssuesEvent_df = IssuesEvent_df.select("event_id", "action", col("assignee.id").alias("assignee_id"), "closed_at", col("user.id").alias("user_id"),
                      col("created_at").alias("issue_created_at"), col("id").alias("issue_id"))
# IssuesEvent_df.limit(10).display()

IssuesEvent_df = IssuesEvent_df.withColumn("issue_created_at", col("issue_created_at").cast("timestamp"))


In [0]:
# Check for missing values:
# check_Missing_Val(IssuesEvent_df.drop("issue_created_at"))

In [0]:
# Check the total number of rows:
# IssuesEvent_df.count()

In [0]:
# Dropping the assignee_id column since it mostly contains nulls:
IssuesEvent_df = IssuesEvent_df.drop("assignee")

In [0]:
# Check for duplicate rows:
# count_Duplicates(IssuesEvent_df)

In [0]:
# Check if event_id is unique:
# check_Unique(IssuesEvent_df, "event_id")

## ForkEvent_df

In [0]:
# A user forks a repository.
ForkEvent_df = flattened_df.where(col("type")=="ForkEvent").select("event_id", "payload.*")

In [0]:
# Only keep event_id column and forkee (nested) column, as all other columns are null.
ForkEvent_df = ForkEvent_df.select("event_id", "forkee.*")
# ForkEvent_df.limit(10).display()

In [0]:
ForkEvent_df = ForkEvent_df.select("event_id", col("id").alias("fork_id"), col("created_at").alias("fork_created_at"), "fork", "forks","forks_count","language", "open_issues_count", col("owner.id").alias("owner_id"), "default_branch")

ForkEvent_df = ForkEvent_df.withColumn("fork_created_at", col("fork_created_at").cast("timestamp"))

In [0]:
# ForkEvent_df.limit(10).display()

In [0]:
check_Missing_Val(ForkEvent_df.drop("fork", "fork_created_at"))

+--------+-------+-----+-----------+--------+-----------------+--------+--------------+
|event_id|fork_id|forks|forks_count|language|open_issues_count|owner_id|default_branch|
+--------+-------+-----+-----------+--------+-----------------+--------+--------------+
|       0|      0|    0|          0|   33686|                0|       0|             0|
+--------+-------+-----+-----------+--------+-----------------+--------+--------------+



In [0]:
# ForkEvent_df.count()    # The language column is all nulls?

In [0]:
# count_Duplicates(ForkEvent_df)

In [0]:
# Find the top 5 default branch names based on their count: "main" and "master" should be the most recurring ones.
# df_grouped = ForkEvent_df.groupBy("default_branch").agg(count("*").alias("count"))
# df_sorted = df_grouped.sort("count", ascending=False)
# df_sorted.limit(10).display()

In [0]:
# ForkEvent_df.select("open_issues_count").distinct().show()   # open_issues_count can be dropped since it only contain one value: 0

In [0]:
# ForkEvent_df.select("fork").distinct().show()   # The fork column only contains one value: true, so it can be dropped

In [0]:
# ForkEvent_df.select("forks").distinct().show()   # The forks column only contains one value: true, which makes sense because the df only contains fork events, so it can be dropped

In [0]:
ForkEvent_df = ForkEvent_df.drop("language", "open_issues_count", "fork", "forks")


In [0]:
# check_Unique(ForkEvent_df, "event_id")

In [0]:
ForkEvent_df.limit(10).display()

event_id,fork_id,fork_created_at,forks_count,owner_id,default_branch
19541181382,443443572,2022-01-01T00:01:17.000+0000,0,72463938,main
19541207342,443443974,2022-01-01T00:05:14.000+0000,0,127967,main
19541207607,443443977,2022-01-01T00:05:16.000+0000,0,16996420,main
19541210000,443444034,2022-01-01T00:05:40.000+0000,0,30208534,main
19541210104,443444036,2022-01-01T00:05:41.000+0000,0,19158291,main
19541216972,443444150,2022-01-01T00:06:58.000+0000,0,86165899,main
19541221476,443444234,2022-01-01T00:07:51.000+0000,0,80707194,main
19541230152,443444429,2022-01-01T00:09:46.000+0000,0,29532712,main
19541244423,443444733,2022-01-01T00:13:08.000+0000,0,16312458,main
19541258091,443445150,2022-01-01T00:16:53.000+0000,0,94422273,main


## IssueCommentEvent_df

In [0]:
# Activity related to an issue or pull request comment.
IssueCommentEvent_df = flattened_df.where(col("type")=="IssueCommentEvent").select("event_id", "payload.*")
#IssueCommentEvent_df.limit(10).display()

In [0]:
# The only non-null columns in Payload are : event_id, action, comment and issue, selected below.
# First, unnesting the nested issue column:
IssueCommentEvent_df = IssueCommentEvent_df.select("event_id", "action", "issue.*", "comment")
#IssueCommentEvent_df.limit(10).display()

In [0]:
# Selecting event_id, action, as well as specific columns from issue and also unnesting the comment column:
IssueCommentEvent_df = IssueCommentEvent_df.select("event_id", "action", col("created_at").alias("issue_created_at"), 
                                                   col("id").alias("issue_id"), col("user.id").alias("issue_user_id"), "comment.*" )
IssueCommentEvent_df = IssueCommentEvent_df.withColumn("issue_created_at", col("issue_created_at").cast("timestamp"))

IssueCommentEvent_df = IssueCommentEvent_df.select("event_id", "action", "issue_created_at", "issue_id", "issue_user_id",
                                                   col("created_at").alias("comment_created_at"), col("id").alias("comment_id"), 
                                                   col("user.id").alias("comment_user_id"))
IssueCommentEvent_df = IssueCommentEvent_df.withColumn("comment_created_at", col("comment_created_at").cast("timestamp"))                                                                                      

In [0]:
#IssueCommentEvent_df.limit(10).display()

In [0]:
# Check for duplicate rows:
# count_Duplicates(IssueCommentEvent_df)

In [0]:
# Check for missing values:
# check_Missing_Val(IssueCommentEvent_df.drop("issue_created_at", "comment_created_at"))

In [0]:
# Check the distinct action types:
# IssueCommentEvent_df.select("action").distinct().show()

In [0]:
# Check if event_id is unique:
# check_Unique(IssueCommentEvent_df, "event_id")

In [0]:
# IssueCommentEvent_df.columns

## WatchEvent

In [0]:
WatchEvent_df = flattened_df.where(col("type")=="WatchEvent").select("event_id", "payload.*")

In [0]:
WatchEvent_df = WatchEvent_df.select("event_id", "action")
# WatchEvent_df.limit(10).display()

## PullRequestReviewCommentEvent

In [0]:
PullRequestReviewCommentEvent_df = flattened_df.select(
    col("event_id"),
    col("payload.pull_request.review_comments"),
    col("payload.pull_request.id").alias("pull_request_id"),
    col("payload.pull_request.base.repo.created_at").alias("repo_created_at"),
    col("payload.pull_request.base.repo.id").alias("_repo_id"),
    col("payload.pull_request.base.repo.language").alias("repo_language")
)

PullRequestReviewCommentEvent_df = PullRequestReviewCommentEvent_df.withColumn("repo_created_at", col("repo_created_at").cast("timestamp")) 
# Display the new DataFrame
PullRequestReviewCommentEvent_df.printSchema()


root
 |-- event_id: string (nullable = true)
 |-- review_comments: long (nullable = true)
 |-- pull_request_id: long (nullable = true)
 |-- repo_created_at: timestamp (nullable = true)
 |-- _repo_id: long (nullable = true)
 |-- repo_language: string (nullable = true)



In [0]:
# a lot of nulls on repo_id, fill repo_id from event_id join
joined_with_event = PullRequestReviewCommentEvent_df.join(
    Event_Table.select("event_id", "repo_id"),
    on="event_id",
    how="left"
)

In [0]:
# Fill missing values in repo_id using coalesce
# if theres no value, fill it in with repo_id from event_table
filled_df = joined_with_event.withColumn(
    "repo_id",
    coalesce(joined_with_event["repo_id"], Event_Table["repo_id"])
)

In [0]:
PullRequestReviewCommentEvent_df = filled_df.drop("_repo_id")

In [0]:
#Fill nulls in review_comments column with 0 
PullRequestReviewCommentEvent_df = PullRequestReviewCommentEvent_df.fillna({'review_comments': '0'})

In [0]:
#Fill nulls in repo_language with 'N/S' (Not Specified)
PullRequestReviewCommentEvent_df = PullRequestReviewCommentEvent_df.fillna({'repo_language': 'N/S'})

In [0]:
# Not within the 85% threshold
PullRequestReviewCommentEvent_df = PullRequestReviewCommentEvent_df.drop("pull_request_id", "repo_created_at")

In [0]:
# PullRequestReviewCommentEvent_df.count()

In [0]:
# check_Missing_Val(PullRequestReviewCommentEvent_df.drop("repo_created_at"))

In [0]:
#see if event_id is null in pullrequestevent to compare to reviewcommentevent
# PullRequestEvent_df.filter(col("event_id") == "19541175619").show()

In [0]:
PullRequestReviewCommentEvent_df.limit(5).display()

event_id,review_comments,repo_language,repo_id
19541174443,0,N/S,436559311
19541174699,0,N/S,379863144
19541174735,0,N/S,442018948
19541174889,0,N/S,420826190
19541175484,0,N/S,138681984


## Write the Silver Layer:


# Tables listed

In [0]:
table_string_names = ["Event_Table",
"repo_df",
"actor_df",
"organization_df",
"PushEvent_df",
"ReleaseEvent_df",
"CreateEvent_df",
"PullRequestEvent_df",
"CommitCommentEvent_df",
"PullRequestReviewCommentEvent_df",
"DeleteEvent_df",
"GollumEvent_df",
"MemberEvent_df",
"IssuesEvent_df",
"ForkEvent_df",
"IssueCommentEvent_df",
"WatchEvent"]

In [0]:
# multiply by 0.242 for partition number (multiply by 31, divide by 128)
table_names = [Event_Table,               # 290.5  MB (70.3 part)
repo_df,                                  #  68.6  MB (16.6)                
actor_df,                                 # 207.9  MB (50.3)
organization_df,                          #  29.4  MB (7.1)
PushEvent_df,                             # 292.0  MB (70.6)
ReleaseEvent_df,                          #   0.52 MB (0.126)
CreateEvent_df,                           #  15.4  MB (3.73)
PullRequestEvent_df,                      #  12.8  MB (3.1)
CommitCommentEvent_df,                    #  11.2  MB (2.71)
PullRequestReviewCommentEvent_df,         #  36.2  MB (8.7)  
DeleteEvent_df,                           #  4.28 MB (1.04 part)
GollumEvent_df,                           #  0.53 MB (0.128 part)
MemberEvent_df,                           #  0.37 MB (0.09 part)
IssuesEvent_df,                           #  4.15 MB (1.0043)
ForkEvent_df,                             #  2.96 MB (3.08 part) 
IssueCommentEvent_df,                     # 12.74 MB (3.08 part)
WatchEvent_df]                            #  2.52 MB (0.61 part)

partition_size = [71, 17, 51, 8, 71, 1, 4, 4, 3, 9, 2, 1, 1, 2, 4, 4, 1]


In [0]:
# Write the tables one at a time based on a specific repartition number to the silver layer container
for indx, table_name in enumerate(table_names):
    df_name = table_name
    df_name.repartition(partition_size[indx]).write.format("parquet").mode("overwrite").save(f"abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/SilverLayer/{table_string_names[indx]}")

    


## Confirm successful write to the datalake

In [0]:
# After writing files to the datalake, run:
display(dbutils.fs.ls(f"abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/"))