In [0]:
%run "./ADLS Setup Variables_SP"

##Imports and Loads

In [0]:
from pyspark.sql.functions import col, when, day,expr,explode
from pyspark.sql.types import BooleanType, StringType, TimestampType, IntegerType, StructType, StructField, LongType
import sparknlp
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.common import *

from sparknlp import Finisher
from sparknlp.pretrained import PretrainedPipeline

In [0]:
df1 = spark.read.format('parquet').load(bronze_path1)
df2 = spark.read.format('parquet').load(bronze_path2)
df = df1.union(df2)

#####=============================================================================================================================================================

#Actor Table
- Add is_bot column

In [0]:
actor_df = (df.select(
                "actor.id",
                "actor.login", 
                "actor.display_login")
            .dropna(subset=["id"])
            .dropDuplicates(["id"])                         
            .withColumn("is_bot",                        
                        when(col("login").contains("[bot]"), True)
                        .otherwise(False)) 
            )

In [0]:
schema = StructType([
    StructField('id', LongType(), nullable = False),
    StructField('login', StringType(), nullable = False),
    StructField('display_login', StringType(), nullable = False),
    StructField('is_bot', BooleanType(), nullable = False)
])
actor_df = spark.createDataFrame(actor_df.rdd, schema = schema)

In [0]:
#Actor Table to Storage

(actor_df
 .repartition(2)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "ActorTable")
)

#Event Table

In [0]:
event_df = df.select(
    col("id").cast("long"),
    col("type").alias("event_type"),
    col("actor.id").alias("actor_id"),
    col("repo.id").alias("repo_id"),
    col("org.id").alias("org_id"),
    "public",
    "created_at"
).dropna(subset=["id"])


In [0]:
schema = StructType([
    StructField('id', LongType(), nullable = False),
    StructField('event_type', StringType(), nullable = False),
    StructField('actor_id', LongType(), nullable = True),
    StructField('repo_id', LongType(), nullable = False),
    StructField('org_id', LongType(), nullable = True),
    StructField('public', BooleanType(), nullable = False),
    StructField('created_at', TimestampType(), nullable = False)
])
event_df = spark.createDataFrame(event_df.rdd, schema = schema)

In [0]:
#Event Table to Storage

(event_df
 .repartition(13)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "EventTable")
)

#Org Table

In [0]:
org_df = (df.select("org.id", "org.login", "org.url")
          .dropna(subset = ["id"])
          .dropDuplicates(["id"]))

In [0]:
schema = StructType([
    StructField('id', LongType(), nullable = False),
    StructField('login', StringType(),),
    StructField('url', StringType()),
])
org_df = spark.createDataFrame(org_df.rdd, schema = schema)

In [0]:
#Org Table to Storage

(org_df
 .repartition(1)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "OrgTable")
)

#Repo Table

In [0]:
repo_df = (df.select("repo.id", "repo.name", "repo.url")
             .dropDuplicates(["id"]))

In [0]:
schema = StructType([
    StructField('id', LongType(), nullable = False),
    StructField('name', StringType(),),
    StructField('url', StringType(),),
])
repo_df = spark.createDataFrame(repo_df.rdd, schema = schema)

In [0]:
#Repo Table to Storage

(repo_df
 .repartition(7)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "RepoTable")
)

#PushEvent Table

- Add is_main column

In [0]:
push_event_df = (df.filter(col("type") == "PushEvent")
                 .select(
                     col("id").cast("long").alias("event_id"),
                     "payload.push_id",
                     "payload.size",
                     "payload.distinct_size",
                     col("payload.ref").alias("branch_ref"),
                     col("payload.commits")[0].sha.alias("commit_id")
                     )).dropna(subset = ["commit_id"]).withColumn(
    "is_main",                                                      
    when(col("branch_ref").endswith("main"), True)
    .when(col("branch_ref").endswith("master"), True)
    .otherwise(False)
)


In [0]:
schema = StructType([
    StructField('event_id', LongType(), nullable = False),
    StructField('push_id', LongType(), nullable = False),
    StructField('size', IntegerType(),),
    StructField('distinct_size', IntegerType(),),
    StructField('branch_ref', StringType(), nullable = False),
    StructField('commit_id', StringType(), nullable = False),
    StructField('is_main', BooleanType(), nullable = False)
])
push_event_df = spark.createDataFrame(push_event_df.rdd, schema = schema)

In [0]:
#Push Table to Storage

(push_event_df
 .repartition(16)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "PushEventTable")
)

# Commit Table

- Optimize Language Detection on Future Sprint

In [0]:
commit_df1 = (df.filter(col("type") == "PushEvent").select(
    col("payload.push_id").alias("commit_id"),
    col("payload.commits")[0].message.alias("commit_message")
    ).dropna(subset = ["commit_message"])
    .filter(col("commit_message").rlike('[a-zA-Z0-9]'))
    )


In [0]:
# Step 1: Transforms raw texts to `document` annotation
document_assembler = (
    DocumentAssembler()
    .setInputCol("commit_message")
    .setOutputCol("document")
)
# Step 2: Determines the language of the text
languageDetector = (
    LanguageDetectorDL.pretrained("ld_wiki_tatoeba_cnn_43")
    .setInputCols("document")
    .setOutputCol("lang")
    .setThreshold(0.6) 
)

finisher = Finisher().setInputCols(["lang"]).setIncludeMetadata(False)

nlpPipeline = Pipeline(stages=[document_assembler, languageDetector, finisher])


ld_wiki_tatoeba_cnn_43 download started this may take some time.
Approximate size to download 7.5 MB
[ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][ | ][ / ][ — ][ \ ][OK!]


In [0]:
result = nlpPipeline.fit(commit_df1).transform(commit_df1)

In [0]:
commit_df = result.select(
    "commit_id",
    col("finished_lang")[0].alias("language")
    )

In [0]:
#Commit Table to Storage

(commit_df
 .repartition(16)
 .write
 .format("parquet")
 .option("header", True)
 .mode("overwrite")       
 .save(silver_path + "CommitTable")
)