# README
This Notebook performs the necessary transformations to get the data from bronze into 3rd Normal Form then writes it to our Data Lake

# Imports

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import Window

# Importing bronze layer

In [0]:
contname = 'team5-project2' #azure storage account container

storage_acct_name = '20230821desa'

client_id = 'de4ff859-02b1-4e2f-9d16-b578fa03df4f' #aka: app id

tenant_id = '33da9f3f-4c1a-4640-8ce1-3f63024aea1d' #aka: directory id

service_credential = dbutils.secrets.get(scope="databricks-app-kv",key="databricks-application")

In [0]:
df_bronze = (spark.read.format('parquet')
      .option("header","true")
      .option('inferColumnTypes', True)
      .load(f"abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/BronzeLayer/day_created=*")).limit(1000)

In [0]:
df_bronze = df_bronze.withColumnRenamed('name','repo_name')
df_bronze = df_bronze.withColumnRenamed('type','action_type')
df_bronze = df_bronze.withColumnRenamed('date_created','action_date_created')
df_bronze = df_bronze.withColumnRenamed('header_id','action_id')

In [0]:
df_bronze.createOrReplaceTempView('raw_layer')

# Creating Tables

<p>Create the tables as defined in the Silver_erd, datatypes being autodetermined.<br>Dropping columns where more than 15% is null</p>

## Creating Organization Table

In [0]:
org_schema = StructType([
  StructField('org_id', LongType(), True),
  StructField('org_login', StringType(), True)
  ])

In [0]:
df_org = spark.createDataFrame(df_bronze.select('org_id','org_login').rdd,org_schema)


## Creating Actor Table

In [0]:
# It should turn into: 
# actor_id, actor_login, everything else is derived

In [0]:
df_actor = spark.sql('SELECT actor_id, actor_login from raw_layer')

## Creating Action Table

In [0]:
# Getting all the FKs to other tables found in each action
df_action = df_bronze.select('action_id','action_type','action_date_created','payload.action',col('actor_id').alias('action_actor_id'),\
    col('org_id').alias('action_org_id'),\
    col('repo_id').alias('action_repo_id'),col("payload.comment.id").alias('action_comment_id'),col("payload.pull_request.id").alias('action_pull_request_id'),\
    col("payload.forkee.id").alias('action_forkee_id'),col("payload.issue.id").alias('action_issue_id'),col("payload.release.id").alias('action_release_id'),\
    col("payload.review.id").alias('action_review_id')).withColumn("action_id",col("action_id").cast(LongType()))


## Creating Review Table

In [0]:
df_review = spark.sql("""SELECT author_association as review_author_assoication, body as review_body, 
  commit_id as review_commit_id, html_url as review_html_url, 
  id as review_id, node_id as review_node_id, pull_request_url as review_pull_request_url, state as review_state, submitted_at as created_at, user.id as review_actor_id FROM 
  (SELECT payload.review.* FROM raw_layer
  WHERE payload.review IS NOT NULL) t1""")

## Creating Repo Table

In [0]:

repo_schema = StructType([
  StructField('repo_id', LongType(), True),
  StructField('repo_name', StringType(), True),
  StructField('public', BooleanType(), True),
  StructField('repo_url', StringType(), True)
  ])

In [0]:
df_repo = spark.createDataFrame(df_bronze.select('repo_id','repo_name','public','repo_url').rdd,repo_schema)


## Creating Comments Table


### Dropped columns and why
<ul>
  <li>Start_line: Used when comments are referecing specific lines from files.</li>
  <li>Start_side: Dictates which way the text will read from, and as such is not useful to us.</li>
</ul>

In [0]:
df_comments = df_bronze.filter('payload.comment IS NOT NULL').select('payload.comment.*')
df_comments = df_comments.withColumn('user',col('user').id)\
                    .withColumn('comment_pull_request_url',coalesce(col('_links.pull_request.href'),col('issue_url')))\
                    .withColumn('reactions_url',col('reactions.url'))\
                    .withColumnRenamed('html_url','comment_html_url')\
                    .withColumnRenamed('url','comment_self_url')\
                    .withColumnRenamed('id','comment_id')\
                    .withColumnRenamed('in_reply_to_id','in_reply_to_comment_id')\
                    .withColumnRenamed('line','comment_line')\
                    .withColumnRenamed('path','comment_path')\
                    .withColumnRenamed('updated_at','updated_at')\
                    .withColumnRenamed('diff_hunk','comment_diff_hunk')\
                    .withColumnRenamed('body','comment_body')\
                    .withColumnRenamed('created_at','created_at')\
                    .withColumnRenamed('issue_url','comment_issue_url')
df_comments = df_comments.drop('side','_links','performed_via_github_app','reactions')

In [0]:
df_comments = df_comments.dropDuplicates().drop('start_line','start_side','original_start_line')

## Creating Issue Table

<ul>
  <li>Dropping assignee because they are simply in the array of assignees.</li>
  <li>Dropping active_lock_reason and performed_via_github_app because they were mainly nulls.</li>
</ul>

In [0]:
df_issue = df_bronze.filter('payload.issue IS NOT NULL').select('payload.issue.*').drop('active_lock_reason','assignee','performed_via_github_app')


## Pull Request

In [0]:
%sql
--Create a simple Pull_request
CREATE OR REPLACE TABLE pull_request
AS SELECT payload.pull_request.* FROM raw_layer
WHERE payload.pull_request IS NOT NULL

num_affected_rows,num_inserted_rows


In [0]:
# Renaming the columns to be in proper format, as well as dropping duplicates
silver_pull_request = spark.sql("""
SELECT 
    base.label as pull_request_base_name, base.repo.id as pull_request_repo_id, base.user.id as pull_request_user_id,
    created_at, updated_at, id as pull_request_id,
    _links.comments.href as pull_request_comments_url,_links.commits.href as pull_request_commits_url,_links.html.href as pull_request_html_url,_links.self.href as pull_request_self_url,_links.statuses.href pull_request_status_url,
    auto_merge.commit_message as pull_request_auto_merge_commit_message, auto_merge.commit_title as pull_request_auto_merge_commit_title, auto_merge.enabled_by.id as pull_request_auto_merge_enabled_by, auto_merge.merge_method as pull_request_auto_merge_method,
    merged_by.id as pull_request_merged_by, milestone.id as pull_request_milestone_id,
    head.label as pull_request_head_name,head.repo.id as pull_request_head_repo_id, head.user.id as pull_request_head_user_id
FROM pull_request
""").drop_duplicates()

## Milestone Table

In [0]:
silver_milestone = df_issue.select('milestone.*').dropna(subset="id")\
    .withColumnRenamed("closed_at","milestone_closed_at")\
    .withColumnRenamed("closed_issues","milestone_closed_issues")\
    .withColumnRenamed("description","milestone_description")\
    .withColumnRenamed("due_on","milestone_due_on")\
    .withColumnRenamed("html_url","milestone_html_url")\
    .withColumnRenamed("id","milestone_id")\
    .withColumnRenamed("labels_url","milestone_labels_url")\
    .withColumnRenamed("node_id","milestone_node_id")\
    .withColumnRenamed("number","milestone_number")\
    .withColumnRenamed("open_issues","milestone_open_issues")\
    .withColumnRenamed("state","milestone_state")\
    .withColumnRenamed("title","milestone_title")\
    .withColumnRenamed("url","milestone_url")
silver_milestone = silver_milestone.dropDuplicates(subset=['milestone_id'])

In [0]:
silver_milestone = silver_milestone.union(
    df_bronze.select('payload.pull_request.milestone.*').filter('payload.pull_request.milestone IS NOT NULL').dropna(subset="id")\
    .withColumnRenamed("closed_at","milestone_closed_at")\
    .withColumnRenamed("closed_issues","milestone_closed_issues")\
    .withColumnRenamed("description","milestone_description")\
    .withColumnRenamed("due_on","milestone_due_on")\
    .withColumnRenamed("html_url","milestone_html_url")\
    .withColumnRenamed("id","milestone_id")\
    .withColumnRenamed("labels_url","milestone_labels_url")\
    .withColumnRenamed("node_id","milestone_node_id")\
    .withColumnRenamed("number","milestone_number")\
    .withColumnRenamed("open_issues","milestone_open_issues")\
    .withColumnRenamed("state","milestone_state")\
    .withColumnRenamed("title","milestone_title")\
    .withColumnRenamed("url","milestone_url"))


## Teams Table

In [0]:
%sql
-- Explode the array in requested teams to have its each individual row
CREATE OR REPLACE TEMPORARY VIEW team_view AS
SELECT col.* FROM (SELECT explode(requested_teams) FROM pull_request WHERE requested_teams IS NOT NULL) t1

In [0]:
%sql
DROP TABLE IF EXISTS teams;
-- Rename the columns
-- Ensure all parent teams are incorporated
CREATE OR REPLACE TABLE teams AS 
SELECT * FROM (
    SELECT description as team_description, html_url as team_html_url, 
  id as team_id, members_url as team_members_url, 
  name as team_name, node_id as team_node_id,
  permission as team_permission, privacy as team_privacy,
  repositories_url as team_repo_url, slug as team_slug,
  url as team_url, regexp_substr(parent.repositories_url,'[0-9]+') as team_repo_id, parent.id as team_parent_id FROM team_view
UNION
SELECT parent.*, regexp_substr(parent.repositories_url,'[0-9]+'), null FROM team_view
WHERE parent IS NOT NULL) t1;

num_affected_rows,num_inserted_rows



## Timestamp 2.0 Easy edition

In [0]:
timestamp_schema = StructType([
        StructField('time_id', IntegerType(), True),
        StructField('created_at',TimestampType(),True),
        StructField('updated_at',TimestampType(),True),
        StructField('deleted_at',TimestampType(),True)
    ])
df_timestamp = spark.createDataFrame([],schema=timestamp_schema)

In [0]:
# Grab all tables that use timestamp, then union (append) them to timestamp by name

# Tables to grab timestamps from: Milestone, Header (Action), Comment, Pull_request, issue
df_timestamp = df_timestamp.unionByName(df_comments.select(['created_at','updated_at']),allowMissingColumns=True)
df_timestamp = df_timestamp.unionByName(silver_pull_request.select(['created_at','updated_at']),allowMissingColumns=True)
df_timestamp = df_timestamp.unionByName(df_issue.select(['created_at','updated_at']),allowMissingColumns=True)
df_timestamp = df_timestamp.unionByName(silver_milestone.select(['created_at','updated_at']),allowMissingColumns=True)
df_timestamp = df_timestamp.unionByName(df_action.select('action_date_created').withColumnRenamed('action_date_created','created_at'),allowMissingColumns=True)
df_timestamp = df_timestamp.unionByName(df_review.select('created_at'),allowMissingColumns=True)
df_timestamp = df_timestamp.dropDuplicates()

In [0]:
# Final step for timestamp, to get time_id counting up one at a time
df_timestamp = df_timestamp.withColumn('time_id',row_number().over(Window().orderBy('created_at')))

### Timestamping all tables

In [0]:
# Will join timestamp and join the original table, then drops the previous columns from both tables
# This will result in time_id being added along each original table

In [0]:
silver_pull_request = silver_pull_request.join(df_timestamp, ['created_at','updated_at'], how='inner').drop('created_at','updated_at','deleted_at')

In [0]:
df_comments = df_comments.join(df_timestamp, ['created_at','updated_at'], how='inner').drop('created_at','updated_at','deleted_at')

In [0]:
df_issue = df_issue.join(df_timestamp, ['created_at','updated_at'], how='inner').drop('created_at','updated_at','deleted_at')

In [0]:
silver_milestone = silver_milestone.join(df_timestamp, ['created_at','updated_at'], how='inner').drop('created_at','updated_at','deleted_at')

In [0]:
silver_review = df_review.join(df_timestamp, ['created_at'], how ='inner').drop('created_at','updated_at','deleted_at')

In [0]:
df_action = df_action.join(df_timestamp, df_action.action_date_created == df_timestamp.created_at, how = 'inner').drop('action_date_created','created_at','updated_at','deleted_at')

## Creating Commits Table

In [0]:
commits_schema = StructType([
  StructField('commit_author_email', StringType(), True),
  StructField('commit_author_name', StringType(), True),
  StructField('commit_distinct', BooleanType(), True),
  StructField('commit_message', StringType(), True),
  StructField('commit_sha', StringType(), True),
  StructField('commit_url', StringType(), True)
  ])

In [0]:
df_commits = df_bronze.filter('payload.commits IS NOT NULL').select('payload.*','action_id').select('commits','action_id')
df_commits = df_commits.select(explode(df_commits.commits),'action_id').select('col.*','action_id').select('author.*','distinct','message','sha','url','action_id')
df_commits = df_commits.withColumnRenamed('email','commit_author_email').withColumnRenamed('name','commit_author_name').withColumnRenamed('distinct','commit_distinct')\
    .withColumnRenamed('message','commit_message').withColumnRenamed('sha','commit_sha').withColumnRenamed('url','commit_url')

In [0]:
silver_commits_action_linker = df_commits.select('action_id','commit_url')
df_commits = df_commits.drop('action_id')

# Cleaning Tables

## Cleaning Org

### Removing Nulls

In [0]:
'''
Every time org_id is Null, so is org_login,
and every time one exists so does the other
so we drop all rows with any nulls
'''
df_org.dropna(thresh=3)
df_org = df_org.na.drop()

### Removing Duplicates

In [0]:
#there are multiple org_logins associated with single org_ids
#I am creating a linking table
org_login_accum = SparkContext.accumulator(0,1)
global org_login_accum

In [0]:
silver_org_logins = df_org.dropDuplicates(subset=['org_login'])
silver_org_linker = df_org.dropDuplicates()
silver_org = df_org.dropDuplicates(subset=['org_id'])

In [0]:
silver_org_logins = silver_org_logins.drop('org_id')

In [0]:
silver_org_logins_window = Window.orderBy("org_login")
silver_org_logins = silver_org_logins.withColumn("org_logins_id", row_number().over(silver_org_logins_window))

In [0]:
silver_org_linker = silver_org_linker.join(silver_org_logins,silver_org_linker.org_login == silver_org_logins.org_login,'inner')

In [0]:
silver_org_linker = silver_org_linker.drop('org_login')

## Cleaning Actor

In [0]:
silver_actor = df_actor.dropDuplicates(subset=['actor_id'])

In [0]:
# Comments
silver_actor = silver_actor.union(df_bronze.filter('payload.comment IS NOT NULL').select('payload.comment.*').select('user.id','user.login'))

In [0]:
# Takes every "actor" from across pull_request include exploded arrays and then combines them.

# Please avert your eyes
silver_actor = silver_actor.union(
spark.sql(
    """
        SELECT `user`.id, `user`.login
        FROM pull_request
        UNION
        SELECT base.user.id, base.user.login FROM pull_request
        UNION
        SELECT base.repo.owner.id, base.repo.owner.login FROM pull_request
        UNION
        SELECT head.user.id, head.user.login FROM pull_request
        UNION 
        SELECT head.repo.owner.id, head.repo.owner.login FROM pull_request
        UNION
        SELECT assignee.id, assignee.login FROM pull_request
        UNION
        SELECT merged_by.id, merged_by.login FROM pull_request
        UNION
        SELECT payload.review.user.id, payload.review.user.login FROM raw_layer WHERE payload.review IS NOT NULL
        UNION
        SELECT col.id, col.login FROM (SELECT explode(assignees) FROM pull_request) t1
        UNION
        SELECT col.id, col.login FROM (SELECT explode(requested_reviewers) FROM pull_request) t1
    """)).drop_duplicates()

## Cleaning Repo

In [0]:
silver_repo = df_repo.drop('repo_name').dropDuplicates(subset=['repo_id'])
silver_repo_linker = df_repo.drop('public').dropDuplicates()
silver_repo_names = df_repo.drop('repo_id').drop('public').dropDuplicates(subset=['repo_name'])

In [0]:
silver_repo_names_window = Window.orderBy("repo_name")
silver_repo_names = silver_repo_names.withColumn("repo_name_id", row_number().over(silver_repo_names_window))

In [0]:
silver_repo_linker = silver_repo_linker.join(silver_repo_names,[silver_repo_linker.repo_name == silver_repo_names.repo_name,silver_repo_linker.repo_url == silver_repo_names.repo_url],'inner')
silver_repo_linker = silver_repo_linker.drop('repo_name','repo_url')

In [0]:
#no columns have a single null!

In [0]:
silver_repo = silver_repo.union(spark.sql('SELECT base.repo.id, NOT base.repo.private as public, base.repo.url FROM pull_request'))
silver_repo = silver_repo.union(spark.sql('SELECT head.repo.id, NOT head.repo.private as public, head.repo.url FROM pull_request')).drop_duplicates(subset=['repo_id'])

In [0]:
silver_repo_names = silver_repo_names.union(spark.sql('SELECT head.repo.name, head.repo.url, head.repo.id FROM pull_request'))
silver_repo_names = silver_repo_names.union(spark.sql('SELECT head.repo.name, head.repo.url, head.repo.id FROM pull_request')).dropDuplicates()

## Cleaning Commits

### Distinct

In [0]:
silver_commits = df_commits.dropDuplicates()
silver_commits_window = Window.orderBy("commit_author_email")
silver_commits = silver_commits.withColumn("commit_id", row_number().over(silver_commits_window))

## Cleaning Issue

In [0]:
silver_issue = df_issue.withColumnRenamed('id','issue_id')\
    .withColumn('issue_milestone_id',df_issue.milestone.id)\
    .withColumn('issue_user_id',df_issue.user.id)\
    .withColumnRenamed('url','issue_url')\
    .drop('assignees','labels','milestone','user')
silver_issue = silver_issue.dropDuplicates(subset=['issue_id'])

In [0]:
links = spark.sql("SELECT _links.* FROM pull_request")
pull_request = silver_issue.select('pull_request.*')

### Issue_Reactions_table

In [0]:
silver_issue_reactions = silver_issue.select('reactions.*').dropDuplicates(subset=['url']).withColumnRenamed('url','reaction_url')
silver_issue = silver_issue.select('*',col('reactions.url').alias('reactions_url')).drop('reactions')

### Issue pull_request Table

In [0]:
silver_issue_pull_request = silver_issue.select('pull_request.*').dropDuplicates(subset=['url']).withColumnRenamed('url','pull_request_url')
silver_issue = silver_issue.select('*',col('pull_request.url').alias('pull_request_url')).drop('pull_request')

### Assignee Table and silver_issue_assignee_linker


<ul>
  <li>Not dropping duplicates in the linker table because I dont want to lose how many times a user was assigned to a single issue.</li>
  <li>I am dropping rows with nulls in any column though, because its a linker table and it would be useless.</li>
</ul>

In [0]:
silver_issue_assignee_linker = df_issue.select(explode_outer('assignees'),'id').withColumnRenamed('col','assignee').withColumnRenamed('id','issue_id').select('assignee.id','issue_id')\
    .withColumnRenamed('id','assiginee_id')
silver_issue_assignee_linker = silver_issue_assignee_linker.na.drop()

In [0]:
silver_pull_request_assignee = spark.sql('SELECT pr_id as pull_request_id, col.id as pull_request_assignee_id FROM (SELECT id as pr_id, EXPLODE(assignees) FROM pull_request) t1').dropDuplicates()

In [0]:
silver_assignee = df_issue.select(explode('assignees')).select('col.*').select('id','login').dropDuplicates(subset=['id'])

In [0]:
# Add pull_request assignees
silver_assignee = silver_assignee.union(spark.sql('SELECT col.id, col.login FROM (SELECT EXPLODE(assignees) FROM pull_request) t1')).dropDuplicates()

### Labels Table and silver_issue_label_linker

In [0]:
silver_issue_label_linker = df_issue.select(explode_outer('labels'),'id').withColumnRenamed('col','lable').withColumnRenamed('id','issue_id').select('lable.id','issue_id')\
    .withColumnRenamed('id','lable_id')
silver_issue_label_linker = silver_issue_label_linker.na.drop()

In [0]:
silver_label = df_issue.select(explode('labels')).select('col.*').dropDuplicates(['id']).withColumnRenamed('id','label_id')\
    .withColumnRenamed('color','label_color')\
    .withColumnRenamed('default','label_default')\
    .withColumnRenamed('description','label_description')\
    .withColumnRenamed('name','label_name')\
    .withColumnRenamed('node_id','label_node_id')\
    .withColumnRenamed('url','label_url')

In [0]:
silver_label = silver_label.union(spark.sql('SELECT col.* FROM (SELECT explode(labels) FROM pull_request)')).drop_duplicates(subset=['label_id','label_node_id'])

### Milestone Table

#### User Table add on

In [0]:
silver_milestone = silver_milestone.withColumn('milestone_user_id',silver_milestone.creator.id)
silver_creator = silver_milestone.select('creator.*').dropDuplicates(subset=['id'])
silver_milestone = silver_milestone.drop('creator').dropDuplicates(subset=['milestone_id'])

In [0]:
silver_creator = silver_creator.dropDuplicates(subset=['id'])
silver_milestone = silver_milestone.dropDuplicates(subset=['milestone_id'])

### User Table

In [0]:
silver_issue_user = df_issue.select('user.*').drop('gravatar_id')

## Cleaning Pull Request

In [0]:
# Linking table between pull_request and actor (requested_reviewer)
silver_pull_request_reviewer_linking = spark.sql('SELECT t1.payload_id, t1.col.id as requested_reviewer_user_id FROM (SELECT id as payload_id, explode(requested_reviewers) FROM pull_request WHERE requested_reviewers IS NOT NULL) t1').dropDuplicates()

In [0]:
# Linking table between pull request and label
silver_pull_request_label_linking = spark.sql('SELECT id as pull_request_id, col.id as labels_id FROM (SELECT id, explode(labels) FROM pull_request) t1').dropDuplicates()

In [0]:
# Linking table between pull_request and teams
silver_pull_request_teams = spark.sql('SELECT id as pull_request_id, col.id as team_id FROM (SELECT id, explode(requested_teams) FROM pull_request \
WHERE requested_teams IS NOT NULL) t1').dropDuplicates()

## Actions Table

In [0]:
silver_action = df_action

## Cleaning Comments

In [0]:
silver_comment = df_comments.dropDuplicates(subset=['comment_id'])
silver_comment_reactions = df_bronze.select('payload.comment.reactions.*').withColumnRenamed('url','reaction_url')

## Amalgamating Reactions

In [0]:
silver_reactions = silver_comment_reactions.unionByName(silver_issue_reactions)

## Amalgomating Users

In [0]:
silver_users = df_bronze.select('payload.comment.user.*').select('id','login').unionByName(silver_assignee).dropna(subset=['id']).select('id','login')\
    .unionByName(df_bronze.select('payload.member.id','payload.member.login')).withColumnRenamed('id','user_id').withColumnRenamed('login','user_login')

In [0]:
silver_creator = silver_creator.select('id','login')
silver_users = silver_users.union(silver_creator)

In [0]:
silver_users = silver_actor.withColumnRenamed('actor_id','user_id').withColumnRenamed('actor_login','user_login')\
    .unionByName(silver_users).dropDuplicates(subset=['user_id'])

# Writing to disk

In [0]:
%sh
# Will output how big each folder is

for item in /dbfs/FileStore/JA/Silver_layer/*
do
    du -sh "$item"
done

75M	/dbfs/FileStore/JA/Silver_layer/silver_action
317K	/dbfs/FileStore/JA/Silver_layer/silver_assignee
347M	/dbfs/FileStore/JA/Silver_layer/silver_commits
51K	/dbfs/FileStore/JA/Silver_layer/silver_creator
158M	/dbfs/FileStore/JA/Silver_layer/silver_issue
580K	/dbfs/FileStore/JA/Silver_layer/silver_issue_assignee_linker
2.3M	/dbfs/FileStore/JA/Silver_layer/silver_issue_label_linker
5.5M	/dbfs/FileStore/JA/Silver_layer/silver_issue_pull_request
4.4M	/dbfs/FileStore/JA/Silver_layer/silver_issue_reactions
36M	/dbfs/FileStore/JA/Silver_layer/silver_issue_user
5.2M	/dbfs/FileStore/JA/Silver_layer/silver_label
477K	/dbfs/FileStore/JA/Silver_layer/silver_milestone
509K	/dbfs/FileStore/JA/Silver_layer/silver_org_linker
717K	/dbfs/FileStore/JA/Silver_layer/silver_org_logins
74M	/dbfs/FileStore/JA/Silver_layer/silver_pull_request
141K	/dbfs/FileStore/JA/Silver_layer/silver_pull_request_assignee
1.1M	/dbfs/FileStore/JA/Silver_layer/silver_pull_request_label_linking
377K	/dbfs/FileStore/JA/Silver_

In [0]:
"""
The % is the percentage size of one day (day 5) compared to the bronze layer
Then we scale this number up by the Total Database size divided by the one day size to get the estimated size of each table. 
Finally we take this number and divide it by the desired size (120Mb) to get the partition size per table.

Total Database: 49.2Gb 
Original Bronze Size: 3957.59Mb (~4%)
commits: 347Mb 8.77% 36 repartitions
issue: 158Mb 4% 18
actions: 75Mb 1.9% 18
pull_request: 74Mb 1.9% 18
repo_names: 37Mb 0.94% 9
issue_user: 36Mb 0.94% 9
repo: 19Mb 0.45% 5
users: 18Mb 0.45% 5
timestamp: 7.6Mb 0.19% 2
repo_linker(?): 6.3Mb 1
issue_pull: 5.5Mb 1
label: 5.2Mb 1
issue_reactions: 4.4Mb 1
issue_label: 2.3Mb 1
pull_request_label: 1.1Mb 1
org_login: 717Kb 1
issue_assignee: 580Kb 1
org_linker: 509Kb 1
milestone: 477Kb 1
pull_request_reviewer: 377Kb 1
assignee: 317Kb 1
pull_request_assigne: 141Kb 1
creator: 51Kb 1
pull_request_teams: 37Kb 1
"""

In [0]:
test_path = f'abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/SilverLayer'

In [0]:
df_comments.repartition(34).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_comments')

In [0]:
df_timestamp.repartition(3).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_timestamp')

In [0]:
silver_action.repartition(18).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_action')

In [0]:
silver_commits.repartition(36).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_commits')

In [0]:
silver_issue.repartition(18).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue')

In [0]:
silver_issue_assignee_linker.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue_assignee_linker')

In [0]:
silver_issue_label_linker.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue_label_linker')

In [0]:
silver_issue_pull_request.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue_pull_request')

In [0]:
silver_issue_reactions.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue_reactions')

In [0]:
silver_issue_user.write.mode('overwrite').format('parquet').save(f'{test_path}/silver_issue_user')

In [0]:
silver_label.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_label')

In [0]:
silver_milestone.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_milestone')

In [0]:
silver_org_linker.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_org_linker')

In [0]:
silver_org_logins.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_org_logins')

In [0]:
silver_pull_request.repartition(22).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_pull_request')

In [0]:
# silver_pull_request_assignee.write.mode('overwrite').format('parquet').save(f'{test_path}/silver_pull_request_assignee')

In [0]:
silver_pull_request_reviewer_linking.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_pull_request_reviewer_linking')

In [0]:
silver_pull_request_label_linking.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_pull_request_label_linking')

In [0]:
silver_pull_request_teams.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_pull_request_teams')

In [0]:
silver_repo.repartition(5).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_repo')

In [0]:
silver_repo_linker.repartition(1).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_repo_linker')

In [0]:
silver_repo_names.repartition(9).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_repo_names')

In [0]:
silver_users.repartition(5).write.mode('overwrite').format('parquet').save(f'{test_path}/silver_users')

In [0]:
silver_commits_action_linker.repartition(38).write.mode('overwrite').format('parquet').save(f'abfss://{contname}@{storage_acct_name}.dfs.core.windows.net/SilverLayer/silver_commits_action_linker')